#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 抖音播放量自动抓取定时器 - 跨平台版本 功能: - 每晚自动执行抖音播放量抓取任务 - 数据抓取完成后自动生类榜单 - 支持Windows、macOS、Linux - 自动保存数据到MongoDB 使用方法: - 正常模式:python Timer_worker.py(启动定时器) - 测试模式:python Timer_worker.py --test(立即执行一次) - 单次执行:python Timer_worker.py --once(立即执行一次并退出) - 仅生成榜单:python Timer_worker.py --ranking-only(仅生成榜单,不抓取数据) """ import schedule import time import sys import os import logging import argparse from datetime import datetime, date, timedelta import config # 添加项目路径到 Python 路径 sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings')) from handlers.Rankings.rank_data_scraper import DouyinPlayVVScraper # 配置日志的函数 def setup_timer_environment(): """设置定时器相关的环境变量""" config.apply_timer_environment() for key, value in config.TIMER_ENV_CONFIG.items(): logging.info(f"设置环境变量: {key}={value}") def setup_logging(quiet_mode=False): """设置日志配置""" # 确保logs目录存在 script_dir = os.path.dirname(os.path.abspath(__file__)) logs_dir = os.path.join(script_dir, 'handlers', 'Rankings', 'logs') os.makedirs(logs_dir, exist_ok=True) # 在安静模式下,只记录WARNING及以上级别的日志到控制台 console_level = logging.WARNING if quiet_mode else logging.INFO logging.basicConfig( level=logging.INFO, # 文件日志仍然记录所有INFO级别 format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(logs_dir, 'scheduler.log'), encoding='utf-8'), logging.StreamHandler() ] ) # 如果是安静模式,调整控制台处理器的级别 if quiet_mode: for handler in logging.getLogger().handlers: if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): handler.setLevel(console_level) class DouyinAutoScheduler: def __init__(self): self.is_running = False def _normalize_play_vv(self, play_vv): """标准化播放量数据类型,将字符串转换为数字""" if isinstance(play_vv, str): try: return int(play_vv.replace(',', '').replace('万', '0000').replace('亿', '00000000')) except: return 0 elif not isinstance(play_vv, (int, float)): return 0 return play_vv def _deduplicate_videos_by_mix_name(self, videos, include_rank=False): """按短剧名称去重,保留播放量最高的记录""" unique_data = {} for video in videos: mix_name = video.get("mix_name", "") if mix_name: # 标准化播放量数据类型 play_vv = self._normalize_play_vv(video.get("play_vv", 0)) if mix_name not in unique_data or play_vv > unique_data[mix_name].get("play_vv", 0): if include_rank: # 用于昨天数据的格式 unique_data[mix_name] = { "play_vv": play_vv, "video_id": str(video.get("_id", "")), "rank": 0 # 稍后计算排名 } else: # 用于今天数据的格式,直接更新原视频对象 video["play_vv"] = play_vv unique_data[mix_name] = video return unique_data def run_douyin_scraper(self): """执行抖音播放量抓取任务""" try: logging.warning("🚀 开始执行抖音播放量抓取任务...") # 设置环境变量,确保定时器模式和自动模式 setup_timer_environment() # 直接创建并运行 DouyinPlayVVScraper 实例 scraper = DouyinPlayVVScraper( start_url="https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation", auto_continue=True, duration_s=60 ) print("📁 开始执行抓取任务...") logging.info("📁 开始执行抓取任务...") scraper.run() print("✅ 抖音播放量抓取任务执行成功") logging.info("✅ 抖音播放量抓取任务执行成功") # 数据抓取完成后,自动生成当日榜单 self.generate_daily_rankings() # 任务完成后立即显示下次执行时间 print("🎯 任务完成,准备下次执行...") self.show_next_run() print("💡 定时器正在等待中,将在整点自动执行任务...") logging.info("🎯 任务完成,准备下次执行...") logging.info("💡 定时器正在等待中,将在整点自动执行任务...") except Exception as e: logging.error(f"💥 执行任务时发生异常: {e}") import traceback logging.error(f"详细错误信息: {traceback.format_exc()}") def generate_daily_rankings(self): """生成每日榜单数据(基于时间轴对比)""" try: from database import db from datetime import timedelta # 获取集合 douyin_collection = db['Ranking_storage_list'] # 使用定时器抓取的数据 rankings_collection = db['Ranking_storage'] today = date.today() yesterday = today - timedelta(days=1) today_str = today.strftime('%Y-%m-%d') yesterday_str = yesterday.strftime('%Y-%m-%d') logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str})...") # 删除当天已有的榜单数据 rankings_collection.delete_many({"date": today_str}) print(f"🗑️ 已清理 {today_str} 的旧榜单数据") logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据") # 获取今天和昨天的榜单数据进行对比 try: print("🔄 正在生成时间轴对比榜单...") logging.info("🔄 正在生成时间轴对比榜单...") # 获取最新批次的数据 latest_batch = douyin_collection.find_one(sort=[("batch_time", -1)]) if not latest_batch: logging.warning("⚠️ 未找到任何数据") return False latest_batch_time = latest_batch.get("batch_time") logging.info(f"📊 找到最新批次时间: {latest_batch_time}") # 只获取最新批次的数据 today_videos_raw = list(douyin_collection.find({"batch_time": latest_batch_time}).sort("play_vv", -1)) logging.info(f"📊 最新批次数据数量: {len(today_videos_raw)}") # 按短剧名称去重,每个短剧只保留播放量最高的一条 unique_videos = {} for video in today_videos_raw: mix_name = video.get("mix_name", "") if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)): unique_videos[mix_name] = video today_videos = list(unique_videos.values()) logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)") # 从Ranking_storage_list中获取昨天最后一次抓取的数据 yesterday_start = datetime.combine(yesterday, datetime.min.time()) yesterday_end = datetime.combine(yesterday, datetime.max.time()) # 获取昨天的最后一次抓取数据(按batch_time排序取最新的) yesterday_latest_batch = douyin_collection.find_one({ "batch_time": { "$gte": yesterday_start, "$lte": yesterday_end } }, sort=[("batch_time", -1)]) yesterday_data = {} if yesterday_latest_batch: yesterday_batch_time = yesterday_latest_batch.get("batch_time") logging.info(f"📊 找到昨天最后一次抓取时间: {yesterday_batch_time}") # 获取昨天最后一次抓取的所有数据 yesterday_videos_raw = list(douyin_collection.find({ "batch_time": yesterday_batch_time }).sort("play_vv", -1)) # 按短剧名称去重,每个短剧只保留播放量最高的一条 unique_yesterday_videos = {} for video in yesterday_videos_raw: mix_name = video.get("mix_name", "") if mix_name and (mix_name not in unique_yesterday_videos or video.get("play_vv", 0) > unique_yesterday_videos[mix_name].get("play_vv", 0)): unique_yesterday_videos[mix_name] = video # 将昨天的数据转换为字典,以短剧名称为键 for mix_name, video in unique_yesterday_videos.items(): yesterday_data[mix_name] = { "rank": 0, # 原始数据没有排名,设为0 "play_vv": video.get("play_vv", 0), "video_id": str(video.get("_id", "")) } logging.info(f"📊 找到昨天的原始数据,共 {len(yesterday_data)} 个短剧(原始数据:{len(yesterday_videos_raw)} 条)") else: logging.info("📊 未找到昨天的原始数据,将作为首次生成") if today_videos: # 先计算所有视频的播放量差值 videos_with_growth = [] for video in today_videos: video_id = str(video.get("_id", "")) current_play_vv = video.get("play_vv", 0) # 计算与昨天的对比数据 play_vv_change = 0 play_vv_change_rate = 0 is_new = True mix_name = video.get("mix_name", "") if mix_name in yesterday_data: is_new = False yesterday_play_vv = yesterday_data[mix_name]["play_vv"] # 计算播放量变化 play_vv_change = current_play_vv - yesterday_play_vv if yesterday_play_vv > 0: play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2) # 创建包含增长数据的视频项 video_with_growth = { "video": video, "play_vv_change": play_vv_change, "play_vv_change_rate": play_vv_change_rate, "is_new": is_new, "yesterday_data": yesterday_data.get(mix_name, {}) } videos_with_growth.append(video_with_growth) # 按播放量差值降序排序(差值越大排名越靠前) videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True) comprehensive_ranking = { "date": today_str, "type": "comprehensive", "name": "播放量增长榜单", "description": f"基于 {yesterday_str} 和 {today_str} 播放量差值排序的榜单(差值越大排名越靠前)", "comparison_date": yesterday_str, "total_videos": len(videos_with_growth), "data": [] } # 获取Rankings_management集合用于补充详细信息 rankings_management_collection = db['Rankings_management'] # 生成排序后的榜单数据 for i, item in enumerate(videos_with_growth, 1): video = item["video"] video_id = str(video.get("_id", "")) current_play_vv = video.get("play_vv", 0) mix_name = video.get("mix_name", "") # 计算排名变化(基于昨天的排名) rank_change = 0 if not item["is_new"] and item["yesterday_data"]: yesterday_rank = item["yesterday_data"].get("rank", 0) rank_change = yesterday_rank - i # 🔍 从Rankings_management获取详细信息 management_data = rankings_management_collection.find_one({"mix_name": mix_name}) ranking_item = { # 🎯 核心榜单字段 "rank": i, "title": mix_name, "mix_name": mix_name, # 确保包含mix_name字段用于同步 "play_vv": current_play_vv, "series_author": video.get("series_author", ""), "video_id": video_id, "video_url": video.get("video_url", ""), "cover_image_url": video.get("cover_image_url", ""), "playcount_str": video.get("playcount", ""), # 📋 从Rankings_management获取的详细字段 "batch_id": management_data.get("batch_id", "") if management_data else "", "batch_time": management_data.get("batch_time") if management_data else None, "item_sequence": management_data.get("item_sequence", 0) if management_data else 0, "mix_id": management_data.get("mix_id", "") if management_data else "", "playcount": management_data.get("playcount", "") if management_data else "", "request_id": management_data.get("request_id", "") if management_data else "", "cover_image_url_original": management_data.get("cover_image_url_original", "") if management_data else "", "cover_upload_success": management_data.get("cover_upload_success", True) if management_data else True, "cover_backup_urls": management_data.get("cover_backup_urls", []) if management_data else [], "desc": management_data.get("desc", "") if management_data else "", "updated_to_episode": management_data.get("updated_to_episode", 0) if management_data else 0, "episode_video_ids": management_data.get("episode_video_ids", []) if management_data else [], "episode_details": management_data.get("episode_details", []) if management_data else [], "data_status": management_data.get("data_status", "") if management_data else "", "realtime_saved": management_data.get("realtime_saved", True) if management_data else True, "created_at": management_data.get("created_at") if management_data else None, "last_updated": management_data.get("last_updated") if management_data else None, "Manufacturing_Field": management_data.get("Manufacturing_Field", "") if management_data else "", "Copyright_field": management_data.get("Copyright_field", "") if management_data else "", "Novel_IDs": management_data.get("Novel_IDs", []) if management_data else [], "Anime_IDs": management_data.get("Anime_IDs", []) if management_data else [], "Drama_IDs": management_data.get("Drama_IDs", []) if management_data else [], # 📊 时间轴对比数据(重要:包含播放量差值) "timeline_data": { "is_new": item["is_new"], "rank_change": rank_change, "play_vv_change": item["play_vv_change"], "play_vv_change_rate": item["play_vv_change_rate"], "yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0, "yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0 } } comprehensive_ranking["data"].append(ranking_item) # 为每次计算添加唯一的时间戳,确保数据唯一性 current_timestamp = datetime.now() comprehensive_ranking["created_at"] = current_timestamp comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}" # 检查今天已有多少次计算 existing_count = rankings_collection.count_documents({ "date": today_str, "type": "comprehensive" }) comprehensive_ranking["calculation_sequence"] = existing_count + 1 # 总是插入新的榜单记录,保留所有历史计算数据 rankings_collection.insert_one(comprehensive_ranking) logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)") logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}") # 📊 检查数据完整性:统计从Rankings_management成功获取详细信息的项目数量 total_items = len(comprehensive_ranking["data"]) items_with_management_data = 0 items_with_manufacturing = 0 items_with_copyright = 0 for item in comprehensive_ranking["data"]: # 检查是否从Rankings_management获取到了数据 if item.get("batch_id") or item.get("desc") or item.get("Manufacturing_Field") or item.get("Copyright_field"): items_with_management_data += 1 if item.get("Manufacturing_Field"): items_with_manufacturing += 1 if item.get("Copyright_field"): items_with_copyright += 1 print(f"📊 数据完整性统计:") print(f" 总项目数: {total_items}") print(f" 从Rankings_management获取到详细信息: {items_with_management_data}") print(f" 包含Manufacturing_Field: {items_with_manufacturing}") print(f" 包含Copyright_field: {items_with_copyright}") logging.info(f"📊 数据完整性: 总{total_items}项,获取详细信息{items_with_management_data}项,Manufacturing_Field: {items_with_manufacturing},Copyright_field: {items_with_copyright}") # 统计信息 new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"]) print(f"✅ 时间轴对比榜单生成成功") print(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录") print(f"🆕 新上榜 {new_count} 条") print(f"🔄 对比基准日期: {yesterday_str}") logging.info(f"✅ 时间轴对比榜单生成成功") logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录") logging.info(f"🆕 新上榜 {new_count} 条") logging.info(f"🔄 对比基准日期: {yesterday_str}") return True else: logging.warning("⚠️ 榜单生成失败:无今日数据") return False except Exception as e: logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}") import traceback logging.error(f"详细错误信息: {traceback.format_exc()}") return False except Exception as e: logging.error(f"💥 生成榜单时发生异常: {e}") import traceback logging.error(f"详细错误信息: {traceback.format_exc()}") def check_and_sync_missing_fields(self): """实时检查并同步当天缺失字段""" try: from database import db # 只检查当天的数据 today = date.today() today_str = today.strftime('%Y-%m-%d') # 首先检查 Rankings_management 是否有当天的数据 rankings_management_collection = db['Rankings_management'] management_count = rankings_management_collection.count_documents({}) if management_count == 0: # Rankings_management 没有数据,说明还没有抓取,直接返回 return rankings_collection = db['Ranking_storage'] key_fields = ['Manufacturing_Field', 'Copyright_field', 'desc', 'series_author'] # 检查今天是否有缺失字段的数据 missing_conditions = [] for field in key_fields: missing_conditions.extend([ {field: {"$exists": False}}, {field: None}, {field: ""} ]) today_missing_count = rankings_collection.count_documents({ "date": today_str, "$or": missing_conditions }) # 如果今天没有缺失数据,静默返回 if today_missing_count == 0: return logging.info(f"🔍 检测到今天有 {today_missing_count} 条缺失字段,Rankings_management有 {management_count} 条数据,开始实时同步...") # 只处理当天的数据 dates_to_check = [today_str] total_missing = 0 total_synced = 0 for check_date in dates_to_check: # 查询该日期缺失字段的数据 rankings_collection = db['Ranking_storage'] # 检查多个关键字段(包括新增的分类字段) key_fields = ['Manufacturing_Field', 'Copyright_field', 'desc', 'series_author', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs'] missing_conditions = [] for field in key_fields: missing_conditions.extend([ {field: {"$exists": False}}, {field: None}, {field: ""} ]) missing_query = { "date": check_date, "$or": missing_conditions } missing_count = rankings_collection.count_documents(missing_query) # 详细统计每个字段的缺失情况 field_stats = {} total_items = rankings_collection.count_documents({"date": check_date}) for field in key_fields: missing_field_count = rankings_collection.count_documents({ "date": check_date, "$or": [ {field: {"$exists": False}}, {field: None}, {field: ""} ] }) field_stats[field] = { "missing": missing_field_count, "completion_rate": ((total_items - missing_field_count) / total_items * 100) if total_items > 0 else 0 } if missing_count > 0: logging.info(f"📅 今日({check_date}): 发现 {missing_count} 条记录缺失字段(总计 {total_items} 条)") # 输出详细的字段统计 for field, stats in field_stats.items(): if stats["missing"] > 0: logging.info(f" - {field}: 缺失 {stats['missing']} 条 ({stats['completion_rate']:.1f}% 完整)") total_missing += missing_count # 尝试同步 try: from routers.rank_api_routes import sync_ranking_storage_fields # 使用改进的重试机制 sync_result = sync_ranking_storage_fields( target_date=check_date, force_update=False, max_retries=2, # 定期检查时重试2次 retry_delay=15 # 15秒重试间隔 ) if sync_result.get("success", False): stats = sync_result.get("stats", {}) synced = stats.get("updated_items", 0) retry_count = stats.get("retry_count", 0) pending_final = stats.get("pending_items_final", 0) total_synced += synced if synced > 0: logging.info(f"✅ 今日({check_date}): 成功同步 {synced} 条记录") if retry_count > 0: logging.info(f"🔄 今日({check_date}): 使用了 {retry_count} 次重试") if pending_final > 0: logging.warning(f"⚠️ 今日({check_date}): {pending_final} 条记录在 Rankings_management 中仍未找到") else: logging.warning(f"⚠️ 今日({check_date}): 同步失败 - {sync_result.get('message', '')}") except Exception as sync_error: logging.error(f"💥 今日({check_date}): 同步过程出错 - {sync_error}") else: if total_items > 0: logging.info(f"📅 {check_date}: 所有字段完整(总计 {total_items} 条记录)") # 显示完整性统计 for field, stats in field_stats.items(): logging.info(f" - {field}: {stats['completion_rate']:.1f}% 完整") else: logging.info(f"📅 {check_date}: 无数据") if total_missing > 0: logging.info(f"🔍 当天同步完成:发现 {total_missing} 条缺失记录,成功同步 {total_synced} 条") print(f"🔍 当天字段同步:发现 {total_missing} 条缺失,同步 {total_synced} 条") else: # 当天没有缺失数据时,不输出日志(静默模式) pass except Exception as e: logging.error(f"💥 检查缺失字段时发生异常: {e}") import traceback logging.error(f"详细错误信息: {traceback.format_exc()}") def setup_schedule(self): """设置定时任务""" # 每小时的整点执行抖音播放量抓取 schedule.every().hour.at(":00").do(self.run_douyin_scraper) # 每1分钟检查一次缺失字段并尝试同步(实时同步) schedule.every(1).minutes.do(self.check_and_sync_missing_fields) logging.info(f"⏰ 定时器已设置:每小时整点执行抖音播放量抓取") logging.info(f"⏰ 定时器已设置:每1分钟检查缺失字段并同步(实时模式)") def show_next_run(self): """显示下次执行时间""" jobs = schedule.get_jobs() if jobs: next_run = jobs[0].next_run current_time = datetime.now() wait_seconds = (next_run - current_time).total_seconds() wait_minutes = int(wait_seconds // 60) wait_hours = int(wait_minutes // 60) remaining_minutes = wait_minutes % 60 print(f"💡 定时器运行中,下次执行:{next_run.strftime('%Y-%m-%d %H:%M:%S')} (还有{wait_hours}h{remaining_minutes}m)") print(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)") logging.info(f"💡 定时器运行中,下次执行:{next_run.strftime('%Y-%m-%d %H:%M:%S')} (还有{wait_hours}h{remaining_minutes}m)") logging.info(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)") def run_once(self): """立即执行一次""" logging.info("🔧 立即执行模式...") self.run_douyin_scraper() def run_test(self): """测试模式 - 立即执行一次""" logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...") self.run_douyin_scraper() def run_ranking_only(self): """仅生成榜单(不抓取数据)""" logging.info("📊 仅生成榜单模式...") self.generate_daily_rankings() def start_scheduler(self): """启动定时器""" self.is_running = True last_status_time = int(time.time()) # 设置为当前时间,1分钟后开始显示状态 print("🚀 抖音播放量自动抓取定时器已启动") print("⏰ 执行时间:每小时整点执行抖音播放量抓取") print("⏹️ 按 Ctrl+C 停止定时器") logging.info("🚀 抖音播放量自动抓取定时器已启动") logging.info(f"⏰ 执行时间:每小时整点执行抖音播放量抓取") logging.info("⏹️ 按 Ctrl+C 停止定时器") # 启动时显示一次下次执行时间 self.show_next_run() try: while self.is_running: schedule.run_pending() time.sleep(1) # 每1分钟显示一次状态 current_time = int(time.time()) if current_time - last_status_time >= 60: # 60秒 = 1分钟 self.show_next_run() last_status_time = current_time except KeyboardInterrupt: print("\n⏹️ 定时器已停止") logging.info("\n⏹️ 定时器已停止") self.is_running = False def main(): """主函数""" try: parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器') parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次') parser.add_argument('--once', action='store_true', help='立即执行一次并退出') parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)') args = parser.parse_args() # 设置日志配置 - 只在定时器模式下启用静默模式 quiet_mode = not (args.test or args.once or args.ranking_only) setup_logging(quiet_mode=quiet_mode) print("正在初始化定时器...") scheduler = DouyinAutoScheduler() if args.test: scheduler._is_timer_mode = False print("执行测试模式...") scheduler.run_test() elif args.once: scheduler._is_timer_mode = False print("执行单次模式...") scheduler.run_once() elif args.ranking_only: scheduler._is_timer_mode = False print("执行榜单生成模式...") scheduler.run_ranking_only() else: scheduler._is_timer_mode = True print("启动定时器模式...") # 显示定时器信息(使用print确保能看到) from datetime import datetime current_time = datetime.now() print(f"🕐 当前时间:{current_time.strftime('%Y-%m-%d %H:%M:%S')}") print(f"⏰ 执行规则:每小时整点执行抖音播放量抓取") # 计算下次执行时间 next_hour = current_time.replace(minute=0, second=0, microsecond=0) if current_time.minute > 0 or current_time.second > 0: next_hour = next_hour.replace(hour=next_hour.hour + 1) if next_hour.hour >= 24: from datetime import timedelta next_hour = next_hour.replace(hour=0) + timedelta(days=1) wait_seconds = (next_hour - current_time).total_seconds() wait_minutes = int(wait_seconds // 60) print(f"⏰ 下次执行时间:{next_hour.strftime('%Y-%m-%d %H:%M:%S')}") print(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)") print("💡 定时器正在等待中,将在整点自动执行任务...") print("⏹️ 按 Ctrl+C 停止定时器") scheduler.setup_schedule() scheduler.start_scheduler() print("程序执行完成") except Exception as e: print(f"程序执行出错: {e}") import traceback traceback.print_exc() return 1 return 0 if __name__ == '__main__': main()