rank_backend/backend/Timer_worker.py

463 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
抖音播放量自动抓取定时器 - 跨平台版本
功能:
- 每晚自动执行抖音播放量抓取任务
- 数据抓取完成后自动生类榜单
- 支持Windows、macOS、Linux
- 自动保存数据到MongoDB
使用方法:
- 正常模式python Timer_worker.py启动定时器
- 测试模式python Timer_worker.py --test立即执行一次
- 单次执行python Timer_worker.py --once立即执行一次并退出
- 仅生成榜单python Timer_worker.py --ranking-only仅生成榜单不抓取数据
"""
import schedule
import time
import sys
import os
import logging
import argparse
from datetime import datetime, date, timedelta
import config
# 添加项目路径到 Python 路径
sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings'))
from handlers.Rankings.rank_data_scraper import DouyinPlayVVScraper
# 配置日志的函数
def setup_timer_environment():
"""设置定时器相关的环境变量"""
config.apply_timer_environment()
for key, value in config.TIMER_ENV_CONFIG.items():
logging.info(f"设置环境变量: {key}={value}")
def setup_logging(quiet_mode=False):
"""设置日志配置"""
# 确保logs目录存在
script_dir = os.path.dirname(os.path.abspath(__file__))
logs_dir = os.path.join(script_dir, 'handlers', 'Rankings', 'logs')
os.makedirs(logs_dir, exist_ok=True)
# 在安静模式下只记录WARNING及以上级别的日志到控制台
console_level = logging.WARNING if quiet_mode else logging.INFO
logging.basicConfig(
level=logging.INFO, # 文件日志仍然记录所有INFO级别
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(logs_dir, 'scheduler.log'), encoding='utf-8'),
logging.StreamHandler()
]
)
# 如果是安静模式,调整控制台处理器的级别
if quiet_mode:
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler):
handler.setLevel(console_level)
class DouyinAutoScheduler:
def __init__(self):
self.is_running = False
def _normalize_play_vv(self, play_vv):
"""标准化播放量数据类型,将字符串转换为数字"""
if isinstance(play_vv, str):
try:
return int(play_vv.replace(',', '').replace('', '0000').replace('亿', '00000000'))
except:
return 0
elif not isinstance(play_vv, (int, float)):
return 0
return play_vv
def _deduplicate_videos_by_mix_name(self, videos, include_rank=False):
"""按短剧名称去重,保留播放量最高的记录"""
unique_data = {}
for video in videos:
mix_name = video.get("mix_name", "")
if mix_name:
# 标准化播放量数据类型
play_vv = self._normalize_play_vv(video.get("play_vv", 0))
if mix_name not in unique_data or play_vv > unique_data[mix_name].get("play_vv", 0):
if include_rank:
# 用于昨天数据的格式
unique_data[mix_name] = {
"play_vv": play_vv,
"video_id": str(video.get("_id", "")),
"rank": 0 # 稍后计算排名
}
else:
# 用于今天数据的格式,直接更新原视频对象
video["play_vv"] = play_vv
unique_data[mix_name] = video
return unique_data
def run_douyin_scraper(self):
"""执行抖音播放量抓取任务"""
try:
logging.warning("🚀 开始执行抖音播放量抓取任务...")
# 设置环境变量,确保定时器模式和自动模式
setup_timer_environment()
# 直接创建并运行 DouyinPlayVVScraper 实例
scraper = DouyinPlayVVScraper(
start_url="https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation",
auto_continue=True,
duration_s=60
)
logging.info("📁 开始执行抓取任务...")
scraper.run()
logging.info("✅ 抖音播放量抓取任务执行成功")
# 数据抓取完成后,自动生成当日榜单
self.generate_daily_rankings()
except Exception as e:
logging.error(f"💥 执行任务时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def generate_daily_rankings(self):
"""生成每日榜单数据(基于时间轴对比)"""
try:
from database import db
from datetime import timedelta
# 获取集合
douyin_collection = db['Ranking_storage_list'] # 使用定时器抓取的数据
rankings_collection = db['Ranking_storage']
today = date.today()
yesterday = today - timedelta(days=1)
today_str = today.strftime('%Y-%m-%d')
yesterday_str = yesterday.strftime('%Y-%m-%d')
logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str}...")
# 删除当天已有的榜单数据
rankings_collection.delete_many({"date": today_str})
logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据")
# 获取今天和昨天的榜单数据进行对比
try:
logging.info("🔄 正在生成时间轴对比榜单...")
# 获取最新批次的数据
latest_batch = douyin_collection.find_one(sort=[("batch_time", -1)])
if not latest_batch:
logging.warning("⚠️ 未找到任何数据")
return False
latest_batch_time = latest_batch.get("batch_time")
logging.info(f"📊 找到最新批次时间: {latest_batch_time}")
# 只获取最新批次的数据
today_videos_raw = list(douyin_collection.find({"batch_time": latest_batch_time}).sort("play_vv", -1))
logging.info(f"📊 最新批次数据数量: {len(today_videos_raw)}")
# 按短剧名称去重,每个短剧只保留播放量最高的一条
unique_videos = {}
for video in today_videos_raw:
mix_name = video.get("mix_name", "")
if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)):
unique_videos[mix_name] = video
today_videos = list(unique_videos.values())
logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)")
# 从Ranking_storage_list中获取昨天最后一次抓取的数据
yesterday_start = datetime.combine(yesterday, datetime.min.time())
yesterday_end = datetime.combine(yesterday, datetime.max.time())
# 获取昨天的最后一次抓取数据按batch_time排序取最新的
yesterday_latest_batch = douyin_collection.find_one({
"batch_time": {
"$gte": yesterday_start,
"$lte": yesterday_end
}
}, sort=[("batch_time", -1)])
yesterday_data = {}
if yesterday_latest_batch:
yesterday_batch_time = yesterday_latest_batch.get("batch_time")
logging.info(f"📊 找到昨天最后一次抓取时间: {yesterday_batch_time}")
# 获取昨天最后一次抓取的所有数据
yesterday_videos_raw = list(douyin_collection.find({
"batch_time": yesterday_batch_time
}).sort("play_vv", -1))
# 按短剧名称去重,每个短剧只保留播放量最高的一条
unique_yesterday_videos = {}
for video in yesterday_videos_raw:
mix_name = video.get("mix_name", "")
if mix_name and (mix_name not in unique_yesterday_videos or video.get("play_vv", 0) > unique_yesterday_videos[mix_name].get("play_vv", 0)):
unique_yesterday_videos[mix_name] = video
# 将昨天的数据转换为字典,以短剧名称为键
for mix_name, video in unique_yesterday_videos.items():
yesterday_data[mix_name] = {
"rank": 0, # 原始数据没有排名设为0
"play_vv": video.get("play_vv", 0),
"video_id": str(video.get("_id", ""))
}
logging.info(f"📊 找到昨天的原始数据,共 {len(yesterday_data)} 个短剧(原始数据:{len(yesterday_videos_raw)} 条)")
else:
logging.info("📊 未找到昨天的原始数据,将作为首次生成")
if today_videos:
# 先计算所有视频的播放量差值
videos_with_growth = []
for video in today_videos:
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
# 计算与昨天的对比数据
play_vv_change = 0
play_vv_change_rate = 0
is_new = True
mix_name = video.get("mix_name", "")
if mix_name in yesterday_data:
is_new = False
yesterday_play_vv = yesterday_data[mix_name]["play_vv"]
# 计算播放量变化
play_vv_change = current_play_vv - yesterday_play_vv
if yesterday_play_vv > 0:
play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2)
# 创建包含增长数据的视频项
video_with_growth = {
"video": video,
"play_vv_change": play_vv_change,
"play_vv_change_rate": play_vv_change_rate,
"is_new": is_new,
"yesterday_data": yesterday_data.get(mix_name, {})
}
videos_with_growth.append(video_with_growth)
# 按播放量差值降序排序(差值越大排名越靠前)
videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True)
comprehensive_ranking = {
"date": today_str,
"type": "comprehensive",
"name": "播放量增长榜单",
"description": f"基于 {yesterday_str}{today_str} 播放量差值排序的榜单(差值越大排名越靠前)",
"comparison_date": yesterday_str,
"total_videos": len(videos_with_growth),
"data": []
}
# 生成排序后的榜单数据
for i, item in enumerate(videos_with_growth, 1):
video = item["video"]
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
mix_name = video.get("mix_name", "")
# 计算排名变化(基于昨天的排名)
rank_change = 0
if not item["is_new"] and item["yesterday_data"]:
yesterday_rank = item["yesterday_data"].get("rank", 0)
rank_change = yesterday_rank - i
ranking_item = {
"rank": i,
"title": mix_name,
"play_vv": current_play_vv,
"author": video.get("author", ""),
"video_id": video_id,
"video_url": video.get("video_url", ""),
"cover_image_url": video.get("cover_image_url", ""),
"playcount_str": video.get("playcount", ""),
# 时间轴对比数据
"timeline_data": {
"is_new": item["is_new"],
"rank_change": rank_change,
"play_vv_change": item["play_vv_change"],
"play_vv_change_rate": item["play_vv_change_rate"],
"yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0,
"yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0
}
}
comprehensive_ranking["data"].append(ranking_item)
# 为每次计算添加唯一的时间戳,确保数据唯一性
current_timestamp = datetime.now()
comprehensive_ranking["created_at"] = current_timestamp
comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}"
# 检查今天已有多少次计算
existing_count = rankings_collection.count_documents({
"date": today_str,
"type": "comprehensive"
})
comprehensive_ranking["calculation_sequence"] = existing_count + 1
# 总是插入新的榜单记录,保留所有历史计算数据
rankings_collection.insert_one(comprehensive_ranking)
logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)")
logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}")
# 统计信息
new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"])
logging.info(f"✅ 时间轴对比榜单生成成功")
logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录")
logging.info(f"🆕 新上榜 {new_count}")
logging.info(f"🔄 对比基准日期: {yesterday_str}")
return True
else:
logging.warning("⚠️ 榜单生成失败:无今日数据")
return False
except Exception as e:
logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
return False
except Exception as e:
logging.error(f"💥 生成榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def setup_schedule(self):
"""设置定时任务"""
# 每小时的整点执行抖音播放量抓取
schedule.every().hour.at(":00").do(self.run_douyin_scraper)
logging.info(f"⏰ 定时器已设置:每小时整点执行抖音播放量抓取")
def show_next_run(self):
"""显示下次执行时间"""
jobs = schedule.get_jobs()
if jobs:
next_run = jobs[0].next_run
logging.info(f"⏰ 下次执行时间: {next_run}")
def run_once(self):
"""立即执行一次"""
logging.info("🔧 立即执行模式...")
self.run_douyin_scraper()
def run_test(self):
"""测试模式 - 立即执行一次"""
logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...")
self.run_douyin_scraper()
def run_ranking_only(self):
"""仅生成榜单(不抓取数据)"""
logging.info("📊 仅生成榜单模式...")
self.generate_daily_rankings()
def start_scheduler(self):
"""启动定时器"""
self.is_running = True
logging.info("🚀 抖音播放量自动抓取定时器已启动")
logging.info(f"⏰ 执行时间:每小时整点执行抖音播放量抓取")
logging.info("⏹️ 按 Ctrl+C 停止定时器")
try:
while self.is_running:
schedule.run_pending()
time.sleep(1)
# 每分钟显示一次状态
if int(time.time()) % 600 == 0:
self.show_next_run()
except KeyboardInterrupt:
logging.info("\n⏹️ 定时器已停止")
self.is_running = False
def main():
"""主函数"""
try:
parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器')
parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次')
parser.add_argument('--once', action='store_true', help='立即执行一次并退出')
parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)')
args = parser.parse_args()
# 设置日志配置 - 只在定时器模式下启用静默模式
quiet_mode = not (args.test or args.once or args.ranking_only)
setup_logging(quiet_mode=quiet_mode)
print("正在初始化定时器...")
scheduler = DouyinAutoScheduler()
if args.test:
scheduler._is_timer_mode = False
print("执行测试模式...")
scheduler.run_test()
elif args.once:
scheduler._is_timer_mode = False
print("执行单次模式...")
scheduler.run_once()
elif args.ranking_only:
scheduler._is_timer_mode = False
print("执行榜单生成模式...")
scheduler.run_ranking_only()
else:
scheduler._is_timer_mode = True
print("启动定时器模式...")
# 显示定时器信息使用print确保能看到
from datetime import datetime
current_time = datetime.now()
print(f"🕐 当前时间:{current_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"⏰ 执行规则:每小时整点执行抖音播放量抓取")
# 计算下次执行时间
next_hour = current_time.replace(minute=0, second=0, microsecond=0)
if current_time.minute > 0 or current_time.second > 0:
next_hour = next_hour.replace(hour=next_hour.hour + 1)
if next_hour.hour >= 24:
from datetime import timedelta
next_hour = next_hour.replace(hour=0) + timedelta(days=1)
wait_seconds = (next_hour - current_time).total_seconds()
wait_minutes = int(wait_seconds // 60)
print(f"⏰ 下次执行时间:{next_hour.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)")
print("💡 定时器正在等待中,将在整点自动执行任务...")
print("⏹️ 按 Ctrl+C 停止定时器")
scheduler.setup_schedule()
scheduler.start_scheduler()
print("程序执行完成")
except Exception as e:
print(f"程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
main()