Initial commit: Douyin play count tracking system

Features:
- Douyin play count scraper using Selenium + Chrome DevTools Protocol
- Automated scheduler for daily data collection
- MongoDB data storage
- Mini-program API server
- Data analysis and visualization tools

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Qyir 2025-10-17 10:48:52 +08:00
commit 53160420d1
13 changed files with 3029 additions and 0 deletions

54
.gitignore vendored Normal file
View File

@ -0,0 +1,54 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Logs
*.log
logs/
miniprogram_api.log
# Data files
douyin_cdp_play_vv_*.json
douyin_cdp_play_vv_*.txt
# Chrome profiles and drivers
drivers/
config/
# Environment variables
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db

133
README_定时器.md Normal file
View File

@ -0,0 +1,133 @@
# 抖音播放量自动抓取定时器 - 跨平台版本
## 🎯 概述
这是一个跨平台的Python定时器可以在任何电脑上运行自动在每晚22:00执行抖音播放量抓取任务。
## 📋 支持平台
- ✅ **Windows** (Windows 10/11)
- ✅ **macOS** (Intel/Apple Silicon)
- ✅ **Linux** (Ubuntu/CentOS等)
## 🚀 快速开始
### 1. 安装依赖
```bash
pip install -r config/requirements.txt
pip install schedule
```
### 2. 启动定时器
```bash
# 启动定时器
python douyin_auto_scheduler.py
# 测试模式(立即执行一次)
python douyin_auto_scheduler.py --test
# 只执行一次
python douyin_auto_scheduler.py --once
# 显示帮助
python douyin_auto_scheduler.py --help
```
### 3. 停止定时器
`Ctrl+C` 停止定时器
## 📁 文件说明
- `douyin_auto_scheduler.py` - 主定时器文件
- `start_scheduler.py` - 简化启动脚本
- `scripts/douyin_selenium_cdp_play_vv.py` - 抓取脚本
- `config/requirements.txt` - Python依赖
## ⚙️ 配置说明
### 执行时间
- **主执行时间**: 每晚 22:00
- **备用执行时间**: 每晚 22:05
### 日志文件
- 定时器日志: `logs/scheduler.log`
- 抓取任务日志: `logs/auto_update_YYYYMMDD.log`
## 🔧 使用方法
### macOS/Linux 后台运行
```bash
# 使用nohup在后台运行
nohup python douyin_auto_scheduler.py > scheduler.log 2>&1 &
# 查看进程
ps aux | grep douyin_auto_scheduler
# 停止进程
pkill -f douyin_auto_scheduler
```
### Windows 后台运行
```bash
# 使用start命令在后台运行
start /B python douyin_auto_scheduler.py
# 查看任务管理器中的Python进程
```
### 首次使用
1. 运行测试模式确保环境正常
2. 手动登录抖音账号保存登录状态
3. 启动定时器
## 🎯 功能特点
- ✅ **跨平台兼容** - 支持所有主流操作系统
- ✅ **自动执行** - 每晚22:00自动运行
- ✅ **状态监控** - 实时显示下次执行时间
- ✅ **错误恢复** - 自动重试机制
- ✅ **日志记录** - 详细的执行日志
- ✅ **数据库保存** - 自动保存到MongoDB
## 📞 故障排除
### 常见问题
1. **Python依赖问题**
```bash
pip install --upgrade pip
pip install -r config/requirements.txt
pip install schedule
```
2. **Chrome浏览器问题**
- 确保Chrome浏览器已安装
- 确保chromedriver可用
3. **MongoDB连接问题**
- 确保MongoDB服务运行
- 检查连接配置
4. **网络连接问题**
- 确保网络连接正常
- 检查防火墙设置
### 日志检查
```bash
# 查看定时器日志
tail -f logs/scheduler.log
# 查看抓取任务日志
ls -la logs/auto_update_*.log
```
## 💡 提示
- 首次使用建议先运行测试模式
- 确保计算机在22:00处于开机状态
- 定期检查日志文件确认任务正常执行
- 如需修改执行时间,编辑 `douyin_auto_scheduler.py` 中的时间设置
## 🎉 完成设置
设置完成后系统将在每晚22:00自动执行无需手动操作

216
docs/API接口文档.md Normal file
View File

@ -0,0 +1,216 @@
# 抖音播放量数据API接口文档
## 🚀 服务器信息
- **服务器地址**: `http://localhost:5000``http://你的服务器IP:5000`
- **协议**: HTTP
- **数据格式**: JSON
- **编码**: UTF-8
- **跨域支持**: 已配置CORS支持小程序调用
## 📋 API接口列表
### 1. 获取最新播放量数据
**接口地址**: `GET /api/latest`
**功能说明**: 获取最新一批的抖音剧本播放量数据,按排名排序
**请求参数**:
- `limit` (可选): 返回数据条数默认50条
**请求示例**:
```
GET /api/latest?limit=10
```
**返回数据格式**:
```json
{
"success": true,
"data": [
{
"rank": 1,
"script_name": "九尾狐男妖爱上我更新37集",
"playcount": "2.1亿",
"playcount_number": 210000000.0,
"update_time": "2025-10-15 18:39:29"
}
],
"total": 35,
"update_time": "2025-10-15 18:39:29"
}
```
### 2. 搜索剧本
**接口地址**: `GET /api/search`
**功能说明**: 根据剧本名称进行模糊搜索
**请求参数**:
- `name` (必需): 搜索关键词
**请求示例**:
```
GET /api/search?name=九尾狐
```
**返回数据格式**:
```json
{
"success": true,
"data": [
{
"rank": 1,
"script_name": "九尾狐男妖爱上我更新37集",
"playcount": "2.1亿",
"playcount_number": 210000000.0,
"update_time": "2025-10-15 18:39:29"
}
],
"total": 1,
"search_keyword": "九尾狐"
}
```
### 3. 获取热门剧本
**接口地址**: `GET /api/top`
**功能说明**: 获取播放量最高的剧本列表
**请求参数**:
- `limit` (可选): 返回数据条数默认10条
**请求示例**:
```
GET /api/top?limit=5
```
**返回数据格式**:
```json
{
"success": true,
"data": [
{
"rank": 1,
"script_name": "九尾狐男妖爱上我更新37集",
"playcount": "2.1亿",
"playcount_number": 210000000.0,
"update_time": "2025-10-15 18:39:29"
}
],
"total": 5
}
```
### 4. 服务器状态
**接口地址**: `GET /api/status`
**功能说明**: 获取API服务器和数据库状态
**请求示例**:
```
GET /api/status
```
**返回数据格式**:
```json
{
"success": true,
"mongodb_status": "连接正常",
"total_records": 35,
"latest_update": "2025-10-15 18:39:29",
"server_time": "2025-10-15 18:54:45"
}
```
## 📱 小程序调用示例
### 微信小程序示例代码
```javascript
// 获取最新数据
wx.request({
url: 'http://你的服务器IP:5000/api/latest',
method: 'GET',
data: {
limit: 20
},
success: function(res) {
console.log('获取数据成功:', res.data);
if (res.data.success) {
// 处理数据
const scripts = res.data.data;
// 更新页面数据
}
},
fail: function(err) {
console.error('请求失败:', err);
}
});
// 搜索剧本
wx.request({
url: 'http://你的服务器IP:5000/api/search',
method: 'GET',
data: {
name: '九尾狐'
},
success: function(res) {
if (res.data.success) {
const searchResults = res.data.data;
// 显示搜索结果
}
}
});
```
## 🔄 数据更新机制
1. **自动更新**: 每天24:00自动运行抓取脚本
2. **实时同步**: 抓取完成后API立即返回最新数据
3. **排序规则**: 数据按播放量自动排序,最高播放量排第一
## 📊 数据字段说明
| 字段名 | 类型 | 说明 |
|--------|------|------|
| rank | number | 排名1为最高 |
| script_name | string | 剧本名称 |
| playcount | string | 播放量文本(如"2.1亿" |
| playcount_number | number | 播放量数值(用于排序) |
| update_time | string | 更新时间 |
## ⚠️ 注意事项
1. **服务器地址**: 请将`localhost`替换为实际的服务器IP地址
2. **端口配置**: 默认端口5000可在服务器代码中修改
3. **数据更新**: 数据每天更新一次,建议小程序缓存数据
4. **错误处理**: 请在小程序中添加网络错误处理逻辑
## 🛠️ 启动API服务器
```bash
# 安装依赖
pip install -r requirements.txt
# 启动服务器
python douyin_api_server.py
```
服务器启动后会显示:
```
🚀 启动抖音播放量API服务器...
📡 API地址: http://localhost:5000
```
## 📞 技术支持
如有问题,请检查:
1. MongoDB是否正常运行
2. 服务器端口是否被占用
3. 网络连接是否正常
4. 数据是否已更新

51
docs/README.md Normal file
View File

@ -0,0 +1,51 @@
# 抖音合集数据抓取工具
这是一个用于抓取抖音合集播放数据的Python脚本。
## 功能特点
- 使用Selenium处理动态加载的内容
- 多种数据提取策略页面源码、DOM元素、网络请求
- 反爬虫机制规避
- 错误处理和重试机制
- 数据保存为JSON格式
## 安装依赖
```bash
pip3 install -r requirements.txt
```
## 使用方法
1. 确保已安装Chrome浏览器
2. 运行脚本:
```bash
python3 douyin_scraper.py
```
## 注意事项
1. **法律合规**: 请确保您的使用符合抖音的服务条款和相关法律法规
2. **频率控制**: 避免过于频繁的请求,以免被反爬虫机制阻止
3. **数据使用**: 抓取的数据仅供学习和研究使用,请勿用于商业用途
4. **Chrome驱动**: 脚本会自动管理Chrome驱动但请确保Chrome浏览器已安装
## 可能遇到的问题
1. **ChromeDriver问题**: 如果遇到驱动问题请确保Chrome浏览器版本与ChromeDriver版本匹配
2. **反爬虫限制**: 抖音有较强的反爬虫机制,可能需要调整请求频率或使用代理
3. **页面结构变化**: 抖音页面结构可能会更新,导致数据提取失败,需要相应调整代码
## 输出数据格式
抓取的数据将保存为 `douyin_collection_data.json` 文件,包含:
- 合集标题
- 播放数据
- 视频列表信息
- 统计数据
## 免责声明
本工具仅供学习和研究使用。使用者需要遵守相关法律法规和平台服务条款,作者不承担任何法律责任。

View File

@ -0,0 +1,190 @@
# 抖音播放量自动更新定时任务配置说明
## 概述
本文档详细说明如何配置Windows任务计划程序实现每天晚上24:00凌晨0点自动运行抖音播放量抓取脚本。
## 前置条件
### 1. 确保MongoDB服务已安装并配置为自动启动
- 下载并安装MongoDB Community Server
- 配置MongoDB服务为自动启动
```cmd
sc config MongoDB start= auto
```
### 2. 确保Python环境正确
- Python 3.7+ 已安装
- 所需依赖包已安装:
```cmd
pip install selenium pymongo webdriver-manager
```
### 3. 浏览器登录状态保持
- **重要**:首次运行前,需要手动运行脚本并登录抖音账号
- 登录后,浏览器会保存登录状态到 `chrome_profile` 目录
- 自动模式将使用已保存的登录状态
## 配置步骤
### 步骤1创建日志目录
在脚本目录下创建 `logs` 文件夹:
```cmd
mkdir c:\Users\EDY\Desktop\test\logs
```
### 步骤2测试批处理脚本
手动运行批处理脚本测试:
```cmd
cd c:\Users\EDY\Desktop\test
auto_update_douyin.bat
```
### 步骤3打开任务计划程序
1. 按 `Win + R`,输入 `taskschd.msc`,回车
2. 或者在开始菜单搜索"任务计划程序"
### 步骤4创建基本任务
1. 在右侧操作面板点击"创建基本任务"
2. 输入任务名称:`抖音播放量自动更新`
3. 输入描述:`每天凌晨0点自动抓取抖音收藏合集播放量数据`
### 步骤5设置触发器
1. 选择"每天"
2. 设置开始时间:`22:00:00`晚上10点
3. 设置开始日期:选择明天的日期
4. 重复间隔:`每 1 天`
### 步骤6设置操作
1. 选择"启动程序"
2. 程序或脚本:`c:\Users\EDY\Desktop\test\auto_update_douyin.bat`
3. 添加参数:`--silent`
4. 起始于:`c:\Users\EDY\Desktop\test`
### 步骤7高级设置
1. 勾选"如果任务失败,重新启动"
2. 设置重新启动间隔:`5分钟`
3. 设置重新启动次数:`3次`
4. 勾选"如果请求后任务还在运行,强行将其停止"
5. 设置停止任务时间:`2小时`
### 步骤8条件设置
1. 取消勾选"只有在计算机使用交流电源时才启动此任务"
2. 勾选"唤醒计算机运行此任务"
3. 勾选"如果任务运行时间超过以下时间则停止任务2小时"
## 文件结构
```
c:\Users\EDY\Desktop\test\
├── douyin_playcount_scraper.py # 主抓取脚本
├── auto_update_douyin.bat # 批处理启动脚本
├── query_mongodb_data.py # 数据查询脚本
├── chrome_profile\ # 浏览器配置文件目录
├── logs\ # 日志文件目录
│ ├── auto_update_20241215.log # 每日日志文件
│ └── ...
├── douyin_collection_data.json # JSON数据文件
├── douyin_collection_playcounts.txt # TXT格式数据文件
└── 定时任务配置说明.md # 本说明文档
```
## 日志文件说明
### 日志位置
- 批处理日志:`logs\auto_update_YYYYMMDD.log`
- Python程序日志同一文件包含详细的执行信息
### 日志内容
- 任务开始和结束时间
- MongoDB连接状态
- 数据抓取进度
- 文件保存结果
- 错误信息(如有)
## 数据存储说明
### MongoDB数据结构
```json
{
"batch_id": "20241215_000000_abc123",
"batch_time": "2024-12-15 00:00:00",
"name": "剧本名称",
"playcount": "1.2万",
"playcount_raw": 12000
}
```
### 数据特点
- 每次运行生成新的批次ID
- 最新数据排在前面(按时间倒序)
- 历史数据完整保留
- 支持按剧本名称搜索历史数据
## 故障排除
### 常见问题
#### 1. MongoDB连接失败
**症状**:日志显示"MongoDB连接失败"
**解决方案**
```cmd
# 检查MongoDB服务状态
sc query MongoDB
# 启动MongoDB服务
net start MongoDB
```
#### 2. 浏览器登录失效
**症状**:抓取失败,提示需要登录
**解决方案**
1. 手动运行脚本:`python douyin_playcount_scraper.py`
2. 重新登录抖音账号
3. 登录状态会自动保存到 `chrome_profile` 目录
#### 3. Python环境问题
**症状**:批处理脚本报错"Python未安装"
**解决方案**
1. 确认Python已安装`python --version`
2. 确认Python在系统PATH中
3. 重新安装依赖:`pip install -r requirements.txt`
#### 4. 权限问题
**症状**:任务计划程序无法执行
**解决方案**
1. 以管理员身份运行任务计划程序
2. 设置任务以最高权限运行
3. 确保用户账户有足够权限
### 监控和维护
#### 查看任务执行历史
1. 打开任务计划程序
2. 找到"抖音播放量自动更新"任务
3. 查看"历史记录"选项卡
#### 查看数据抓取结果
运行查询脚本:
```cmd
python query_mongodb_data.py
```
#### 手动测试
定期手动运行测试:
```cmd
python douyin_playcount_scraper.py --auto-mode
```
## 注意事项
1. **网络连接**确保计算机在凌晨0点时有稳定的网络连接
2. **电源管理**:建议设置计算机不要在夜间自动休眠
3. **防火墙**确保防火墙不会阻止浏览器和MongoDB的网络访问
4. **磁盘空间**:定期清理旧的日志文件,避免占用过多磁盘空间
5. **数据备份**建议定期备份MongoDB数据库
## 联系支持
如遇到问题,请检查:
1. 日志文件中的错误信息
2. 任务计划程序的执行历史
3. MongoDB服务状态
4. 网络连接状态

33
docs/需求.md Normal file
View File

@ -0,0 +1,33 @@
现在这是我的项目概览:
项目概览
- 目标:抓取抖音“合集”和“单个视频”的数据,用于分析。
- 技术: selenium + webdriver-manager ,少量 requests ;支持无头模式与手动登录。
- 能力:从页面源码 JSON、DOM 选择器等多源提取数据,含反爬规避与调试截图。
脚本与输出
- douyin_scraper.py :抓取合集数据(标题、播放/点赞统计、视频列表);输出 douyin_collection_data.json 。
- douyin_scraper_enhanced.py :合集增强版(手动登录、页面状态分析、截图);输出 douyin_collection_data_enhanced.json 。
- douyin_video_scraper.py :抓取单视频互动数据(点赞/评论/分享/收藏/播放等)、基本信息与评论采样;输出 douyin_video_data.json ,保存页面截图。
运行与依赖
- 依赖: requirements.txt selenium 、 requests 、 webdriver-manager )。
- 运行示例: python3 douyin_scraper.py 、交互式增强版/视频版可选择手动登录。
注意事项
- 合规与频控:遵守平台条款与法律,控制抓取频率。
- 结构变更:抖音页面结构可能更新,需调整选择器/解析逻辑。
- 环境匹配:确保本机 Chrome 与 ChromeDriver 版本兼容。
需求:
根据链接https://www.douyin.com/video/7556193043586600201
后面的7556193043586600201是可以替换的获取到所有的短剧集数视频列表
7556193043586600201这个就是集数每一集的这个都不一样
进行爬取短剧的点赞数,评论数,收藏数,全部评论内容
总体来说就是输入视频的链接也就是这个https://www.douyin.com/video/7556193043586600201然后就可以自动的获取到该系列所有的视频列表有了列表之后只需要遍历列表对每个视频进行单独的爬取之后就可以获取到该视频的点赞数、收藏数、评论数、评论内容了获取的视频列表一定要保存起来
但是现在有一个简单的要求,就是可以输入一个视频的链接,然后只对这一集来进行爬取点赞数、收藏数、评论数、评论内容
流程:
输入视频链接->自动识别短剧集数列表->通过列表抓取所有集数短剧视频->获取该系列每个视频的点赞数、收藏数、评论数、评论内容->打印出来即可

65
scripts/check_mongodb.py Normal file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查MongoDB数据保存状态
"""
from pymongo import MongoClient
from datetime import datetime
import sys
def check_mongodb():
"""检查MongoDB连接和数据"""
try:
# 使用与主脚本相同的连接参数
client = MongoClient('localhost', 27017, serverSelectionTimeoutMS=5000)
# 测试连接
client.admin.command('ping')
print("MongoDB连接成功")
# 检查数据库和集合
db = client['douyin_data']
collection = db['play_vv_records']
total_count = collection.count_documents({})
print(f"总记录数: {total_count}")
# 检查今天的数据
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_count = collection.count_documents({'batch_time': {'$gte': today_start}})
print(f"今天的数据记录数: {today_count}")
# 显示最新5条记录按时间倒序排列
print("\n最新5条记录按时间倒序排列:")
print("-" * 60)
for doc in collection.find().sort('batch_time', -1).limit(5):
print(f"合集名称: {doc.get('mix_name', '未知')}")
print(f"播放量: {doc.get('play_vv', 0):,} ({doc.get('playcount', '')})")
print(f"合集链接: {doc.get('video_url', '')}")
print(f"保存时间: {doc.get('batch_time', '')}")
print(f"视频ID数: {len(doc.get('aweme_ids', []))}")
print(f"封面图片: {'' if doc.get('cover_image_url') else ''}")
print("-" * 60)
# 显示字段结构
if total_count > 0:
sample = collection.find_one()
print(f"\n文档字段结构:")
for key in sample.keys():
print(f" - {key}: {type(sample[key]).__name__}")
except Exception as e:
print(f"检查MongoDB时出错: {e}")
return False
return True
if __name__ == '__main__':
print("=== MongoDB数据检查 ===")
success = check_mongodb()
if success:
print("\n检查完成")
else:
print("\n检查失败")
sys.exit(1)

View File

@ -0,0 +1,151 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
抖音播放量自动抓取定时器 - 跨平台版本
功能
- 每天上午9:35自动执行抖音播放量抓取任务
- 支持WindowsmacOSLinux
- 自动保存数据到MongoDB
"""
import schedule
import time
import subprocess
import sys
import os
import logging
from pathlib import Path
from datetime import datetime
# 配置日志的函数
def setup_logging():
"""设置日志配置"""
# 确保logs目录存在
os.makedirs('../logs', exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('../logs/scheduler.log', encoding='utf-8'),
logging.StreamHandler()
]
)
class DouyinAutoScheduler:
def __init__(self):
self.is_running = False
def run_douyin_scraper(self):
"""执行抖音播放量抓取任务"""
try:
logging.info("🚀 开始执行抖音播放量抓取任务...")
# 设置环境变量,确保自动模式
os.environ['AUTO_CONTINUE'] = '1'
# 构建脚本路径 - 现在在同一目录下
script_path = Path(__file__).parent / 'douyin_selenium_cdp_play_vv.py'
if not script_path.exists():
logging.error(f"❌ 脚本文件不存在: {script_path}")
return
logging.info(f"📁 执行脚本: {script_path}")
# 使用subprocess执行脚本
result = subprocess.run([
sys.executable,
str(script_path),
'--auto',
'--duration', '60'
], capture_output=True, text=True, encoding='utf-8', errors='ignore')
if result.returncode == 0:
logging.info("✅ 抖音播放量抓取任务执行成功")
if result.stdout:
logging.info(f"📄 输出: {result.stdout.strip()}")
else:
logging.error(f"❌ 任务执行失败,返回码: {result.returncode}")
if result.stderr:
logging.error(f"💥 错误信息: {result.stderr.strip()}")
if result.stdout:
logging.info(f"📄 输出: {result.stdout.strip()}")
except Exception as e:
logging.error(f"💥 执行任务时发生异常: {e}")
def setup_schedule(self):
"""设置定时任务"""
# 主执行时间每天上午9:35
schedule.every().day.at("09:35").do(self.run_douyin_scraper)
logging.info("⏰ 定时器已设置每天上午9:35执行抖音播放量抓取")
def show_next_run(self):
"""显示下次执行时间"""
jobs = schedule.get_jobs()
if jobs:
next_run = jobs[0].next_run
logging.info(f"⏰ 下次执行时间: {next_run}")
def run_once(self):
"""立即执行一次"""
logging.info("🔧 立即执行模式...")
self.run_douyin_scraper()
def run_test(self):
"""测试模式 - 立即执行一次"""
logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...")
self.run_douyin_scraper()
def start_scheduler(self):
"""启动定时器"""
self.is_running = True
logging.info("🚀 抖音播放量自动抓取定时器已启动")
logging.info("⏰ 执行时间每天上午9:35")
logging.info("📁 目标脚本douyin_selenium_cdp_play_vv.py")
logging.info("💾 数据保存MongoDB")
logging.info("⏹️ 按 Ctrl+C 停止定时器")
self.show_next_run()
try:
while self.is_running:
schedule.run_pending()
time.sleep(1)
# 每分钟显示一次状态
if int(time.time()) % 60 == 0:
self.show_next_run()
except KeyboardInterrupt:
logging.info("\n⏹️ 定时器已停止")
self.is_running = False
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器')
parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次')
parser.add_argument('--once', action='store_true', help='立即执行一次并退出')
args = parser.parse_args()
# 设置日志配置
setup_logging()
scheduler = DouyinAutoScheduler()
if args.test:
scheduler.run_test()
elif args.once:
scheduler.run_once()
else:
scheduler.setup_schedule()
scheduler.start_scheduler()
if __name__ == '__main__':
main()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,593 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
小程序专用抖音播放量数据API服务器
优化的数据格式和接口设计专为小程序使用
"""
from flask import Flask, jsonify, request
from flask_cors import CORS
from pymongo import MongoClient
from datetime import datetime, timedelta
import logging
import os
import re
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('miniprogram_api.log', encoding='utf-8'),
logging.StreamHandler()
]
)
app = Flask(__name__)
CORS(app) # 允许跨域访问,支持小程序调用
class MiniprogramAPI:
def __init__(self):
self.client = None
self.db = None
self.collection = None
self.connect_mongodb()
def connect_mongodb(self):
"""连接MongoDB数据库"""
try:
self.client = MongoClient('mongodb://localhost:27017/')
# 测试连接
self.client.admin.command('ping')
# 使用数据库与集合
self.db = self.client['douyin_data']
self.collection = self.db['play_vv_records']
logging.info("MongoDB连接成功")
return True
except Exception as e:
logging.error(f"MongoDB连接失败: {e}")
return False
def format_playcount(self, playcount_str):
"""格式化播放量字符串为数字"""
if not playcount_str:
return 0
try:
if isinstance(playcount_str, (int, float)):
return int(playcount_str)
playcount_str = str(playcount_str).strip()
# 处理亿、万等单位
if "亿" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 100000000)
elif "" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 10000)
else:
# 尝试直接转换数字
return int(float(playcount_str))
except:
return 0
def format_cover_url(self, cover_data):
"""格式化封面图片URL"""
if not cover_data:
return ""
if isinstance(cover_data, str):
return cover_data
elif isinstance(cover_data, dict) and 'url_list' in cover_data:
return cover_data['url_list'][0] if cover_data['url_list'] else ""
else:
return ""
def format_time(self, time_obj):
"""格式化时间"""
if not time_obj:
return ""
if isinstance(time_obj, datetime):
return time_obj.strftime("%Y-%m-%d %H:%M:%S")
else:
return str(time_obj)
def format_video_item(self, doc):
"""格式化单个视频数据项 - 完全按照数据库原始字段返回"""
return {
"_id": str(doc.get("_id", "")),
"batch_time": self.format_time(doc.get("batch_time")),
"mix_name": doc.get("mix_name", ""),
"video_url": doc.get("video_url", ""),
"playcount": doc.get("playcount", ""),
"play_vv": doc.get("play_vv", 0),
"request_id": doc.get("request_id", ""),
"rank": doc.get("rank", 0),
"aweme_ids": doc.get("aweme_ids", []),
"cover_image_url": doc.get("cover_image_url", ""),
"cover_backup_urls": doc.get("cover_backup_urls", [])
}
def get_video_list(self, page=1, limit=20, sort_by="playcount"):
"""获取视频列表(分页)"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 设置排序字段
if sort_by == "growth":
# 按增长排序需要特殊处理
return self.get_growth_videos(page, limit)
else:
sort_field = "play_vv" if sort_by == "playcount" else "batch_time"
sort_order = -1 # 降序
# 获取今天的日期
today = datetime.now().date()
# 只查询今天的数据
query_condition = {
"batch_time": {
"$gte": datetime(today.year, today.month, today.day),
"$lt": datetime(today.year, today.month, today.day) + timedelta(days=1)
}
}
# 查询数据并按短剧名称分组,取每个短剧的最新记录
pipeline = [
{"$match": query_condition},
{"$sort": {"batch_time": -1}}, # 按时间倒序
{"$group": {
"_id": "$mix_name", # 按短剧名称分组
"latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录)
}},
{"$replaceRoot": {"newRoot": "$latest_doc"}},
{"$sort": {sort_field: sort_order}},
{"$skip": skip},
{"$limit": limit}
]
docs = list(self.collection.aggregate(pipeline))
# 获取总数
total_pipeline = [
{"$match": query_condition},
{"$sort": {"batch_time": -1}},
{"$group": {"_id": "$mix_name"}},
{"$count": "total"}
]
total_result = list(self.collection.aggregate(total_pipeline))
total = total_result[0]["total"] if total_result else 0
# 格式化数据
video_list = []
for doc in docs:
item = self.format_video_item(doc)
video_list.append(item)
return {
"success": True,
"data": video_list,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": sort_by,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取视频列表失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def get_growth_videos(self, page=1, limit=20, start_date=None, end_date=None):
"""获取按播放量增长排序的视频列表"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 如果没有提供日期,默认使用今天和昨天
if not start_date or not end_date:
end_date = datetime.now().date()
start_date = end_date - timedelta(days=1)
else:
# 转换字符串日期为datetime对象
if isinstance(start_date, str):
start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
if isinstance(end_date, str):
end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
# 查询结束日期的数据
end_cursor = self.collection.find({
"batch_time": {
"$gte": datetime(end_date.year, end_date.month, end_date.day),
"$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1)
}
})
end_data = list(end_cursor)
# 查询开始日期的数据
start_cursor = self.collection.find({
"batch_time": {
"$gte": datetime(start_date.year, start_date.month, start_date.day),
"$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1)
}
})
start_data = list(start_cursor)
# 创建字典以便快速查找
end_dict = {item["mix_name"]: item for item in end_data}
start_dict = {item["mix_name"]: item for item in start_data}
# 计算增长数据
growth_data = []
for mix_name, end_item in end_dict.items():
if mix_name in start_dict:
start_item = start_dict[mix_name]
growth = end_item.get("play_vv", 0) - start_item.get("play_vv", 0)
# 只保留增长为正的数据
if growth > 0:
item = self.format_video_item(end_item)
item["growth"] = growth
item["start_date"] = start_date.strftime("%Y-%m-%d")
item["end_date"] = end_date.strftime("%Y-%m-%d")
growth_data.append(item)
else:
# 如果开始日期没有数据,但结束日期有,也认为是新增长
item = self.format_video_item(end_item)
item["growth"] = end_item.get("play_vv", 0)
item["start_date"] = start_date.strftime("%Y-%m-%d")
item["end_date"] = end_date.strftime("%Y-%m-%d")
growth_data.append(item)
# 按增长值降序排序
growth_data.sort(key=lambda x: x.get("growth", 0), reverse=True)
# 分页处理
total = len(growth_data)
paginated_data = growth_data[skip:skip + limit]
# 添加排名
for i, item in enumerate(paginated_data):
item["rank"] = skip + i + 1
return {
"success": True,
"data": paginated_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": "growth",
"date_range": {
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d")
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取增长视频列表失败: {e}")
# 如果增长计算失败,返回按播放量排序的数据作为备选
return self.get_video_list(page, limit, "playcount")
def get_top_videos(self, limit=10):
"""获取热门视频TOP榜单"""
try:
# 按播放量排序获取热门视频
cursor = self.collection.find().sort("play_vv", -1).limit(limit)
docs = list(cursor)
if not docs:
return {"success": False, "message": "暂无数据"}
# 格式化数据
top_list = []
for doc in docs:
item = self.format_video_item(doc)
top_list.append(item)
return {
"success": True,
"data": top_list,
"total": len(top_list),
"update_time": self.format_time(docs[0].get("batch_time")) if docs else ""
}
except Exception as e:
logging.error(f"获取热门视频失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def search_videos(self, keyword, page=1, limit=10):
"""搜索视频"""
try:
if not keyword:
return {"success": False, "message": "请提供搜索关键词"}
# 计算跳过的数量
skip = (page - 1) * limit
# 构建搜索条件(模糊匹配合集名称)
search_condition = {
"mix_name": {"$regex": keyword, "$options": "i"}
}
# 查询数据
cursor = self.collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit)
docs = list(cursor)
# 获取搜索结果总数
total = self.collection.count_documents(search_condition)
# 格式化数据
search_results = []
for doc in docs:
item = self.format_video_item(doc)
search_results.append(item)
return {
"success": True,
"data": search_results,
"keyword": keyword,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"搜索视频失败: {e}")
return {"success": False, "message": f"搜索失败: {str(e)}"}
def get_video_detail(self, video_id):
"""获取视频详情"""
try:
from bson import ObjectId
# 尝试通过ObjectId查找
try:
doc = self.collection.find_one({"_id": ObjectId(video_id)})
except:
# 如果ObjectId无效尝试其他字段
doc = self.collection.find_one({
"$or": [
{"mix_name": video_id},
{"request_id": video_id}
]
})
if not doc:
return {"success": False, "message": "未找到视频信息"}
# 格式化详细信息 - 只返回数据库原始字段
detail = self.format_video_item(doc)
return {
"success": True,
"data": detail,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取视频详情失败: {e}")
return {"success": False, "message": f"获取详情失败: {str(e)}"}
def get_statistics(self):
"""获取统计信息"""
try:
# 基本统计
total_videos = self.collection.count_documents({})
if total_videos == 0:
return {"success": False, "message": "暂无数据"}
# 播放量统计
pipeline = [
{
"$group": {
"_id": None,
"total_playcount": {"$sum": "$play_vv"},
"avg_playcount": {"$avg": "$play_vv"},
"max_playcount": {"$max": "$play_vv"},
"min_playcount": {"$min": "$play_vv"}
}
}
]
stats_result = list(self.collection.aggregate(pipeline))
stats = stats_result[0] if stats_result else {}
# 获取最新更新时间
latest_doc = self.collection.find().sort("batch_time", -1).limit(1)
latest_time = ""
if latest_doc:
latest_list = list(latest_doc)
if latest_list:
latest_time = self.format_time(latest_list[0].get("batch_time"))
# 热门分类统计(按播放量区间)
categories = [
{"name": "超热门", "min": 100000000, "count": 0}, # 1亿+
{"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿
{"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万
{"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下
]
for category in categories:
if "max" in category:
count = self.collection.count_documents({
"play_vv": {"$gte": category["min"], "$lte": category["max"]}
})
else:
count = self.collection.count_documents({
"play_vv": {"$gte": category["min"]}
})
category["count"] = count
return {
"success": True,
"data": {
"total_videos": total_videos,
"total_playcount": stats.get("total_playcount", 0),
"avg_playcount": int(stats.get("avg_playcount", 0)),
"max_playcount": stats.get("max_playcount", 0),
"min_playcount": stats.get("min_playcount", 0),
"categories": categories,
"latest_update": latest_time
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取统计信息失败: {e}")
return {"success": False, "message": f"获取统计失败: {str(e)}"}
# 创建API实例
api = MiniprogramAPI()
# API路由定义
@app.route('/')
def index():
"""API首页"""
return jsonify({
"name": "小程序抖音播放量数据API",
"version": "2.0",
"description": "专为小程序优化的抖音播放量数据接口",
"endpoints": {
"/api/videos": "获取视频列表 (支持分页和排序)",
"/api/top": "获取热门视频榜单",
"/api/search": "搜索视频",
"/api/detail": "获取视频详情",
"/api/stats": "获取统计信息",
"/api/health": "健康检查"
},
"features": [
"分页支持",
"多种排序方式",
"搜索功能",
"详情查看",
"统计分析",
"小程序优化"
]
})
@app.route('/api/videos')
def get_videos():
"""获取视频列表"""
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 20, type=int)
sort_by = request.args.get('sort', 'playcount') # playcount, time, 或 growth
start_date = request.args.get('start_date', None)
end_date = request.args.get('end_date', None)
# 限制参数范围
page = max(1, page)
limit = min(50, max(1, limit)) # 限制每页最多50条
if sort_by == "growth":
# 增长排序需要特殊处理,支持日期参数
result = api.get_growth_videos(page, limit, start_date, end_date)
else:
result = api.get_video_list(page, limit, sort_by)
return jsonify(result)
@app.route('/api/top')
def get_top():
"""获取热门视频榜单"""
limit = request.args.get('limit', 10, type=int)
limit = min(50, max(1, limit)) # 限制最多50条
result = api.get_top_videos(limit)
return jsonify(result)
@app.route('/api/search')
def search():
"""搜索视频"""
keyword = request.args.get('q', '').strip()
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
# 限制参数范围
page = max(1, page)
limit = min(30, max(1, limit)) # 搜索结果限制每页最多30条
result = api.search_videos(keyword, page, limit)
return jsonify(result)
@app.route('/api/detail')
def get_detail():
"""获取视频详情"""
video_id = request.args.get('id', '').strip()
if not video_id:
return jsonify({"success": False, "message": "请提供视频ID"})
result = api.get_video_detail(video_id)
return jsonify(result)
@app.route('/api/stats')
def get_stats():
"""获取统计信息"""
result = api.get_statistics()
return jsonify(result)
@app.route('/api/health')
def health_check():
"""健康检查"""
try:
# 检查MongoDB连接
api.client.admin.command('ping')
# 获取基本信息
total_count = api.collection.count_documents({})
return jsonify({
"success": True,
"status": "healthy",
"mongodb": "connected",
"total_records": total_count,
"server_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"api_version": "2.0"
})
except Exception as e:
return jsonify({
"success": False,
"status": "unhealthy",
"mongodb": "disconnected",
"error": str(e),
"server_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
if __name__ == '__main__':
print("启动小程序专用抖音播放量API服务器...")
print("API地址: http://localhost:5001")
print("小程序API接口列表:")
print(" - GET /api/videos?page=1&limit=20&sort=playcount 获取视频列表(总播放量排序)")
print(" - GET /api/videos?page=1&limit=20&sort=growth 获取视频列表(增长排序,默认昨天到今天的差值)")
print(" - GET /api/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17 获取视频列表(自定义日期范围增长排序)")
print(" - GET /api/top?limit=10 获取热门榜单")
print(" - GET /api/search?q=关键词&page=1&limit=10 搜索视频")
print(" - GET /api/detail?id=视频ID 获取视频详情")
print(" - GET /api/stats 获取统计信息")
print(" - GET /api/health 健康检查")
print("专为小程序优化:分页、搜索、详情、统计、增长排序、自定义日期范围")
app.run(host='0.0.0.0', port=5001, debug=True)

View File

@ -0,0 +1,294 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MongoDB数据库快速查看工具
一次性显示数据库结构统计信息和最新数据
"""
import pymongo
from pymongo import MongoClient
from datetime import datetime
import json
from collections import defaultdict
def connect_mongodb(connection_string='mongodb://localhost:27017/'):
"""连接到MongoDB"""
try:
client = MongoClient(connection_string, serverSelectionTimeoutMS=5000)
client.admin.command('ping')
print(f"✅ 成功连接到MongoDB: {connection_string}")
return client
except Exception as e:
print(f"❌ 连接MongoDB失败: {e}")
return None
def analyze_document_schema(document):
"""分析文档结构"""
if not document:
return {}
schema = {}
for key, value in document.items():
if key == '_id':
schema[key] = {'type': 'ObjectId', 'example': str(value)}
elif isinstance(value, str):
schema[key] = {'type': 'string', 'example': value[:50] + '...' if len(value) > 50 else value}
elif isinstance(value, int):
schema[key] = {'type': 'integer', 'example': value}
elif isinstance(value, float):
schema[key] = {'type': 'float', 'example': value}
elif isinstance(value, bool):
schema[key] = {'type': 'boolean', 'example': value}
elif isinstance(value, datetime):
schema[key] = {'type': 'datetime', 'example': value.strftime('%Y-%m-%d %H:%M:%S')}
elif isinstance(value, list):
schema[key] = {
'type': 'array',
'length': len(value),
'example': value[:3] if len(value) <= 3 else value[:3] + ['...']
}
elif isinstance(value, dict):
schema[key] = {
'type': 'object',
'keys': list(value.keys())[:5],
'example': {k: v for k, v in list(value.items())[:2]}
}
else:
schema[key] = {'type': type(value).__name__, 'example': str(value)[:50]}
return schema
def display_database_info(client):
"""显示数据库信息"""
print("\n" + "="*80)
print("📊 MongoDB 数据库结构分析")
print("="*80)
try:
db_names = client.list_database_names()
for db_name in db_names:
if db_name in ['admin', 'local', 'config']:
continue
db = client[db_name]
collections = db.list_collection_names()
print(f"\n🗄️ 数据库: {db_name}")
print(f" 集合数量: {len(collections)}")
for coll_name in collections:
collection = db[coll_name]
count = collection.count_documents({})
print(f"\n 📁 集合: {coll_name}")
print(f" 文档数量: {count:,}")
if count > 0:
# 获取样本文档来分析结构
sample_doc = collection.find_one()
schema = analyze_document_schema(sample_doc)
if schema:
print(f" 📋 字段结构:")
for field_name, field_info in schema.items():
print(f"{field_name}: {field_info['type']}")
if 'example' in field_info:
example = field_info['example']
if isinstance(example, str) and len(example) > 100:
example = example[:100] + "..."
print(f" 示例: {example}")
else:
print(f" ⚠️ 集合为空")
except Exception as e:
print(f"❌ 获取数据库信息失败: {e}")
def display_statistics(client, db_name='douyin_data', collection_name='play_vv_records'):
"""显示统计信息"""
try:
db = client[db_name]
collection = db[collection_name]
print(f"\n📊 统计信息 ({db_name}.{collection_name})")
print("-" * 50)
# 基本统计
total_count = collection.count_documents({})
print(f"📈 总文档数: {total_count:,}")
if total_count == 0:
print("⚠️ 集合为空,无法显示统计信息")
return
# 时间范围统计
time_fields = ['batch_time', 'created_at', 'timestamp']
for field in time_fields:
if collection.find_one({field: {'$exists': True}}):
pipeline = [
{'$group': {
'_id': None,
'min_time': {'$min': f'${field}'},
'max_time': {'$max': f'${field}'}
}}
]
result = list(collection.aggregate(pipeline))
if result:
min_time = result[0]['min_time']
max_time = result[0]['max_time']
print(f"📅 时间范围 ({field}):")
print(f" 最早: {min_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" 最新: {max_time.strftime('%Y-%m-%d %H:%M:%S')}")
break
# 播放量统计
playcount_fields = ['play_vv', 'playcount', 'play_count', 'views']
for field in playcount_fields:
if collection.find_one({field: {'$exists': True, '$type': 'number'}}):
pipeline = [
{'$group': {
'_id': None,
'total_plays': {'$sum': f'${field}'},
'avg_plays': {'$avg': f'${field}'},
'max_plays': {'$max': f'${field}'},
'min_plays': {'$min': f'${field}'}
}}
]
result = list(collection.aggregate(pipeline))
if result:
stats = result[0]
print(f"🎬 播放量统计 ({field}):")
print(f" 总播放量: {stats['total_plays']:,}")
print(f" 平均播放量: {stats['avg_plays']:,.0f}")
print(f" 最高播放量: {stats['max_plays']:,}")
print(f" 最低播放量: {stats['min_plays']:,}")
break
# 热门内容统计
if collection.find_one({'mix_name': {'$exists': True}}):
print(f"\n🔥 热门内容 (按播放量排序):")
pipeline = [
{'$match': {'play_vv': {'$exists': True, '$type': 'number'}}},
{'$sort': {'play_vv': -1}},
{'$limit': 5},
{'$project': {'mix_name': 1, 'play_vv': 1, 'batch_time': 1}}
]
top_content = list(collection.aggregate(pipeline))
for i, content in enumerate(top_content, 1):
name = content.get('mix_name', '未知')
plays = content.get('play_vv', 0)
time_str = content.get('batch_time', datetime.now()).strftime('%m-%d %H:%M')
print(f" {i}. {name}: {plays:,} ({time_str})")
except Exception as e:
print(f"❌ 获取统计信息失败: {e}")
def display_recent_data(client, db_name='douyin_data', collection_name='play_vv_records', limit=3):
"""显示最近的数据"""
try:
db = client[db_name]
collection = db[collection_name]
print(f"\n📈 最近 {limit} 条数据 ({db_name}.{collection_name})")
print("-" * 80)
# 尝试按时间字段排序
time_fields = ['batch_time', 'created_at', 'timestamp', '_id']
sort_field = None
for field in time_fields:
if collection.find_one({field: {'$exists': True}}):
sort_field = field
break
if sort_field:
recent_docs = list(collection.find().sort(sort_field, -1).limit(limit))
else:
recent_docs = list(collection.find().limit(limit))
if not recent_docs:
print("⚠️ 没有找到数据")
return
for i, doc in enumerate(recent_docs, 1):
print(f"\n📄 记录 {i}:")
display_document(doc)
except Exception as e:
print(f"❌ 获取最近数据失败: {e}")
def display_document(doc, indent=2):
"""显示单个文档"""
spaces = " " * indent
for key, value in doc.items():
if key == '_id':
print(f"{spaces}🆔 {key}: {value}")
elif isinstance(value, datetime):
print(f"{spaces}📅 {key}: {value.strftime('%Y-%m-%d %H:%M:%S')}")
elif isinstance(value, str):
display_value = value[:100] + "..." if len(value) > 100 else value
print(f"{spaces}📝 {key}: {display_value}")
elif isinstance(value, (int, float)):
if key in ['playcount', 'play_count', 'views', 'play_vv']:
print(f"{spaces}📊 {key}: {value:,}")
else:
print(f"{spaces}🔢 {key}: {value}")
elif isinstance(value, list):
print(f"{spaces}📋 {key}: [{len(value)} 项]")
if len(value) > 0 and len(value) <= 3:
for item in value[:3]:
item_str = str(item)[:50] + "..." if len(str(item)) > 50 else str(item)
print(f"{spaces} - {item_str}")
elif len(value) > 3:
for item in value[:2]:
item_str = str(item)[:50] + "..." if len(str(item)) > 50 else str(item)
print(f"{spaces} - {item_str}")
print(f"{spaces} ... 还有 {len(value)-2}")
elif isinstance(value, dict):
print(f"{spaces}📦 {key}: {{对象}}")
if len(value) <= 3:
for k, v in value.items():
v_str = str(v)[:50] + "..." if len(str(v)) > 50 else str(v)
print(f"{spaces} {k}: {v_str}")
else:
for k, v in list(value.items())[:2]:
v_str = str(v)[:50] + "..." if len(str(v)) > 50 else str(v)
print(f"{spaces} {k}: {v_str}")
print(f"{spaces} ... 还有 {len(value)-2} 个字段")
else:
print(f"{spaces}{key}: {value}")
def main():
"""主函数"""
print("🚀 MongoDB 数据库快速查看工具")
print(f"⏰ 查看时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 连接数据库
client = connect_mongodb()
if not client:
return
try:
# 显示数据库结构
display_database_info(client)
# 显示统计信息
display_statistics(client)
# 显示最近数据
display_recent_data(client)
print(f"\n{'='*80}")
print("✅ 数据库查看完成!")
print("💡 提示: 运行 'python scripts/mongodb_viewer.py' 可以使用交互式查看器")
print("🔄 提示: 重新运行此脚本可以查看最新数据")
except KeyboardInterrupt:
print("\n👋 程序被用户中断")
finally:
if client:
client.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
查询MongoDB中的抖音播放量数据
"""
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from datetime import datetime
def connect_mongodb():
"""连接MongoDB"""
try:
client = MongoClient('mongodb://localhost:27017/', serverSelectionTimeoutMS=5000)
client.admin.command('ping')
db = client['douyin——data']
collection = db['playcounts']
print("MongoDB连接成功")
return client, collection
except ConnectionFailure:
print("MongoDB连接失败请确保MongoDB服务已启动")
return None, None
except Exception as e:
print(f"MongoDB连接出错: {e}")
return None, None
def query_latest_batches(collection, limit=5):
"""查询最近的几个批次数据"""
try:
# 按批次时间倒序获取最近的批次
pipeline = [
{"$group": {
"_id": "$batch_id",
"batch_time": {"$first": "$batch_time"},
"count": {"$sum": 1}
}},
{"$sort": {"batch_time": -1}},
{"$limit": limit}
]
batches = list(collection.aggregate(pipeline))
if not batches:
print("暂无数据")
return
print(f"\n===== 最近 {len(batches)} 个批次 =====")
for batch in batches:
batch_time = batch['batch_time'].strftime("%Y-%m-%d %H:%M:%S")
print(f"批次ID: {batch['_id']}, 时间: {batch_time}, 数据条数: {batch['count']}")
# 显示该批次的具体数据按播放量排序如果有rank字段则按rank排序否则按playcount_number排序
batch_data = list(collection.find(
{"batch_id": batch['_id']},
{"name": 1, "playcount": 1, "rank": 1, "playcount_number": 1, "_id": 0}
))
# 按rank排序如果存在否则按playcount_number降序排序
if batch_data and 'rank' in batch_data[0]:
batch_data.sort(key=lambda x: x.get('rank', 999))
elif batch_data and 'playcount_number' in batch_data[0]:
batch_data.sort(key=lambda x: x.get('playcount_number', 0), reverse=True)
for i, item in enumerate(batch_data, 1):
rank_info = f"[第{item.get('rank', i)}名] " if 'rank' in item else ""
print(f" {rank_info}{item['name']}")
print(f" 播放量: {item['playcount']}")
print()
except Exception as e:
print(f"查询数据失败: {e}")
def query_by_name(collection, name_keyword):
"""根据剧本名称关键词查询"""
try:
# 使用正则表达式进行模糊匹配
query = {"name": {"$regex": name_keyword, "$options": "i"}}
results = list(collection.find(query).sort("batch_time", -1))
if not results:
print(f"未找到包含'{name_keyword}'的剧本")
return
print(f"\n===== 包含'{name_keyword}'的剧本 =====")
for result in results:
batch_time = result['batch_time'].strftime("%Y-%m-%d %H:%M:%S")
print(f"剧本: {result['name']}")
print(f"播放量: {result['playcount']}")
print(f"抓取时间: {batch_time}")
print(f"批次ID: {result['batch_id']}")
print("-" * 30)
except Exception as e:
print(f"查询失败: {e}")
def main():
print("抖音播放量数据查询工具")
print("=" * 40)
client, collection = connect_mongodb()
if collection is None:
return
try:
while True:
print("\n请选择操作:")
print("1. 查看最近的批次数据")
print("2. 根据剧本名称搜索")
print("3. 退出")
choice = input("请输入选项 (1-3): ").strip()
if choice == '1':
limit = input("显示最近几个批次? (默认5): ").strip()
try:
limit = int(limit) if limit else 5
except ValueError:
limit = 5
query_latest_batches(collection, limit)
elif choice == '2':
keyword = input("请输入剧本名称关键词: ").strip()
if keyword:
query_by_name(collection, keyword)
else:
print("关键词不能为空")
elif choice == '3':
break
else:
print("无效选项,请重新选择")
except KeyboardInterrupt:
print("\n用户中断操作")
finally:
if client:
client.close()
print("已断开MongoDB连接")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
查看MongoDB最新数据 - 始终按时间倒序排列
"""
from pymongo import MongoClient
from datetime import datetime
def view_latest_data(limit=20):
"""查看最新数据"""
try:
client = MongoClient('localhost', 27017)
db = client['douyin_data']
collection = db['play_vv_records']
print("=== 抖音播放量最新数据 ===")
print(f"显示最新 {limit} 条记录(按时间倒序排列)")
print("=" * 80)
# 获取最新数据,按时间倒序排列
latest_docs = list(collection.find().sort('batch_time', -1).limit(limit))
if not latest_docs:
print("没有找到数据")
return
for i, doc in enumerate(latest_docs, 1):
print(f"\n记录 #{i}")
print("-" * 50)
print(f"合集名称: {doc.get('mix_name', '未知')}")
print(f"播放量: {doc.get('play_vv', 0):,} ({doc.get('playcount', '')})")
print(f"合集链接: {doc.get('video_url', '')}")
print(f"保存时间: {doc.get('batch_time', '')}")
print(f"视频ID数: {len(doc.get('aweme_ids', []))}")
print(f"封面图片: {'' if doc.get('cover_image_url') else ''}")
# 显示统计信息
total_count = collection.count_documents({})
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_count = collection.count_documents({'batch_time': {'$gte': today_start}})
print(f"\n" + "=" * 80)
print(f"统计信息:")
print(f"- 总记录数: {total_count}")
print(f"- 今天记录数: {today_count}")
print(f"- 最新记录时间: {latest_docs[0].get('batch_time')}")
except Exception as e:
print(f"查看数据时出错: {e}")
if __name__ == '__main__':
import sys
limit = int(sys.argv[1]) if len(sys.argv) > 1 else 20
view_latest_data(limit)