#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 抖音播放量自动抓取定时器 - 跨平台版本 功能: - 每晚24:00自动执行抖音播放量抓取任务 - 支持Windows、macOS、Linux - 自动保存数据到MongoDB """ import schedule import time import subprocess import sys import os import logging from pathlib import Path from datetime import datetime # 配置日志的函数 def setup_logging(): """设置日志配置""" # 确保logs目录存在 import os script_dir = os.path.dirname(os.path.abspath(__file__)) logs_dir = os.path.join(script_dir, 'handlers', 'Rankings', 'logs') os.makedirs(logs_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(logs_dir, 'scheduler.log'), encoding='utf-8'), logging.StreamHandler() ] ) class DouyinAutoScheduler: def __init__(self): self.is_running = False def run_douyin_scraper(self): """执行抖音播放量抓取任务""" try: logging.info("🚀 开始执行抖音播放量抓取任务...") # 设置环境变量,确保自动模式 os.environ['AUTO_CONTINUE'] = '1' # 构建脚本路径 - 指向Rankings目录中的脚本 script_path = Path(__file__).parent / 'handlers' / 'Rankings' / 'rank_data_scraper.py' if not script_path.exists(): logging.error(f"❌ 脚本文件不存在: {script_path}") return logging.info(f"📁 执行脚本: {script_path}") # 使用subprocess执行脚本 result = subprocess.run([ sys.executable, str(script_path), '--auto', '--duration', '60' ], capture_output=True, text=True, encoding='utf-8', errors='ignore') if result.returncode == 0: logging.info("✅ 抖音播放量抓取任务执行成功") if result.stdout: logging.info(f"📄 输出: {result.stdout.strip()}") else: logging.error(f"❌ 任务执行失败,返回码: {result.returncode}") if result.stderr: logging.error(f"💥 错误信息: {result.stderr.strip()}") if result.stdout: logging.info(f"📄 输出: {result.stdout.strip()}") except Exception as e: logging.error(f"💥 执行任务时发生异常: {e}") def setup_schedule(self): """设置定时任务""" # 主执行时间:每晚24:00(午夜) schedule.every().day.at("00:00").do(self.run_douyin_scraper) logging.info("⏰ 定时器已设置:每晚24:00执行抖音播放量抓取") def show_next_run(self): """显示下次执行时间""" jobs = schedule.get_jobs() if jobs: next_run = jobs[0].next_run logging.info(f"⏰ 下次执行时间: {next_run}") def run_once(self): """立即执行一次""" logging.info("🔧 立即执行模式...") self.run_douyin_scraper() def run_test(self): """测试模式 - 立即执行一次""" logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...") self.run_douyin_scraper() def start_scheduler(self): """启动定时器""" self.is_running = True logging.info("🚀 抖音播放量自动抓取定时器已启动") logging.info("⏰ 执行时间:每晚24:00") logging.info("📁 目标脚本:rank_data_scraper.py") logging.info("💾 数据保存:MongoDB") logging.info("⏹️ 按 Ctrl+C 停止定时器") self.show_next_run() try: while self.is_running: schedule.run_pending() time.sleep(1) # 每分钟显示一次状态 if int(time.time()) % 60 == 0: self.show_next_run() except KeyboardInterrupt: logging.info("\n⏹️ 定时器已停止") self.is_running = False def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器') parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次') parser.add_argument('--once', action='store_true', help='立即执行一次并退出') args = parser.parse_args() # 设置日志配置 setup_logging() scheduler = DouyinAutoScheduler() if args.test: scheduler.run_test() elif args.once: scheduler.run_once() else: scheduler.setup_schedule() scheduler.start_scheduler() if __name__ == '__main__': main()