rank_backend/backend/Timer_worker.py
2025-11-18 11:42:47 +08:00

831 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
抖音播放量自动抓取定时器 - 跨平台版本
功能:
- 每晚自动执行抖音播放量抓取任务
- 数据抓取完成后自动生类榜单
- 支持Windows、macOS、Linux
- 自动保存数据到MongoDB
使用方法:
- 正常模式python Timer_worker.py启动定时器
- 测试模式python Timer_worker.py --test立即执行一次
- 单次执行python Timer_worker.py --once立即执行一次并退出
- 仅生成榜单python Timer_worker.py --ranking-only仅生成榜单不抓取数据
"""
import schedule
import time
import sys
import os
import logging
import argparse
from datetime import datetime, date, timedelta
import config
# 添加项目路径到 Python 路径
sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings'))
from handlers.Rankings.rank_data_scraper import DouyinPlayVVScraper
# 配置日志的函数
def setup_timer_environment():
"""设置定时器相关的环境变量"""
config.apply_timer_environment()
for key, value in config.TIMER_ENV_CONFIG.items():
logging.info(f"设置环境变量: {key}={value}")
def setup_logging(quiet_mode=False):
"""设置日志配置"""
# 确保logs目录存在
script_dir = os.path.dirname(os.path.abspath(__file__))
logs_dir = os.path.join(script_dir, 'handlers', 'Rankings', 'logs')
os.makedirs(logs_dir, exist_ok=True)
# 在安静模式下只记录WARNING及以上级别的日志到控制台
console_level = logging.WARNING if quiet_mode else logging.INFO
logging.basicConfig(
level=logging.INFO, # 文件日志仍然记录所有INFO级别
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(logs_dir, 'scheduler.log'), encoding='utf-8'),
logging.StreamHandler()
]
)
# 如果是安静模式,调整控制台处理器的级别
if quiet_mode:
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler):
handler.setLevel(console_level)
class DouyinAutoScheduler:
def __init__(self):
self.is_running = False
# 创建logger实例
self.logger = logging.getLogger(__name__)
def _sync_episode_details_with_lock(self, episode_details, comments_summary):
"""
同步 episode_details 时处理评论锁定逻辑
如果有 comments_summary则保留评论内容只更新互动数据
Args:
episode_details: 管理数据库中的 episode_details
comments_summary: 评论总结字段
Returns:
处理后的 episode_details
"""
# 如果没有 comments_summary 或没有 episode_details直接返回原数据
if not comments_summary or not episode_details:
return episode_details
# 如果有 comments_summary说明评论内容已锁定直接返回管理数据库的数据
# 因为管理数据库中已经保存了锁定的评论内容
logging.info(f'🔒 检测到 comments_summaryepisode_details 将保持锁定状态(包含评论内容)')
return episode_details
def _normalize_play_vv(self, play_vv):
"""标准化播放量数据类型,将字符串转换为数字"""
if isinstance(play_vv, str):
try:
return int(play_vv.replace(',', '').replace('', '0000').replace('亿', '00000000'))
except:
return 0
elif not isinstance(play_vv, (int, float)):
return 0
return play_vv
def check_browser_login_status(self):
"""检查浏览器登录状态,如果没有登录则提示用户登录"""
try:
import os
script_dir = os.path.dirname(os.path.abspath(__file__))
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile_timer', 'douyin_persistent')
# 检查配置文件是否为空(可能未登录)
import glob
profile_files = glob.glob(os.path.join(profile_dir, "*"))
if len(profile_files) < 5: # 如果文件太少,可能未登录
print("⚠️ 检测到定时器浏览器可能未登录")
print(" 请在浏览器中完成抖音登录,并导航到【我的】→【收藏】→【合集】页面")
print(" 完成后按回车键继续...")
input()
else:
print("✅ 定时器浏览器已配置,继续执行...")
except Exception as e:
logging.warning(f"检查浏览器登录状态时出错: {e}")
print("⚠️ 检查浏览器状态失败,请确保浏览器已正确配置")
print(" 完成后按回车键继续...")
input()
def _cleanup_chrome_processes(self):
"""清理可能占用配置文件的Chrome进程"""
try:
import psutil
import os
# 获取当前配置文件路径
script_dir = os.path.dirname(os.path.abspath(__file__))
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile_timer', 'douyin_persistent')
# 查找使用该配置文件的Chrome进程
killed_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if proc.info['name'] and 'chrome' in proc.info['name'].lower():
cmdline = proc.info['cmdline']
if cmdline and any(profile_dir in arg for arg in cmdline):
proc.terminate()
killed_processes.append(proc.info['pid'])
logging.info(f'终止占用配置文件的Chrome进程: PID {proc.info["pid"]}')
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
# 等待进程终止
if killed_processes:
import time
time.sleep(2)
return len(killed_processes) > 0
except ImportError:
# 如果没有psutil跳过清理以避免影响其他脚本实例
logging.warning('psutil 不可用,跳过进程清理(避免全局终止 Chrome')
return False
except Exception as e:
logging.warning(f'清理Chrome进程时出错: {e}')
return False
def run_douyin_scraper(self):
"""执行抖音播放量抓取任务"""
try:
logging.warning("🚀 开始执行抖音播放量抓取任务...")
# 设置环境变量,确保定时器模式和自动模式
setup_timer_environment()
# 直接创建并运行 DouyinPlayVVScraper 实例
scraper = DouyinPlayVVScraper(
start_url="https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation",
auto_continue=True,
duration_s=60 # 增加到60秒给更多时间收集数据
)
print("开始执行抓取任务...")
logging.info("📁 开始执行抓取任务...")
scraper.run()
print("抖音播放量抓取任务执行成功")
logging.info("✅ 抖音播放量抓取任务执行成功")
# 数据抓取完成后,自动生成当日榜单
self.generate_daily_rankings()
# 任务完成后立即显示下次执行时间
print("🎯 任务完成,准备下次执行...")
self.show_next_run()
print("💡 定时器正在等待中,将在整点自动执行任务...")
logging.info("🎯 任务完成,准备下次执行...")
logging.info("💡 定时器正在等待中,将在整点自动执行任务...")
except Exception as e:
logging.error(f"💥 执行任务时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def generate_daily_rankings(self):
"""生成每日榜单数据(基于时间轴对比)"""
try:
from database import db
from datetime import timedelta
# 获取集合
douyin_collection = db['Ranking_storage_list'] # 使用定时器抓取的数据
rankings_collection = db['Ranking_storage']
today = date.today()
yesterday = today - timedelta(days=1)
today_str = today.strftime('%Y-%m-%d')
yesterday_str = yesterday.strftime('%Y-%m-%d')
logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str}...")
# 删除当天已有的榜单数据
rankings_collection.delete_many({"date": today_str})
print(f"🗑️ 已清理 {today_str} 的旧榜单数据")
logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据")
# 获取今天和昨天的榜单数据进行对比
try:
print("🔄 正在生成时间轴对比榜单...")
logging.info("🔄 正在生成时间轴对比榜单...")
# 获取最新批次的数据
latest_batch = douyin_collection.find_one(sort=[("batch_time", -1)])
if not latest_batch:
logging.warning("⚠️ 未找到任何数据")
return False
latest_batch_time = latest_batch.get("batch_time")
logging.info(f"📊 找到最新批次时间: {latest_batch_time}")
# 只获取最新批次的数据
today_videos_raw = list(douyin_collection.find({"batch_time": latest_batch_time}).sort("play_vv", -1))
logging.info(f"📊 最新批次数据数量: {len(today_videos_raw)}")
# 按短剧ID去重每个短剧只保留播放量最高的一条
# 🚫 过滤掉空的或无效的mix_id和播放量为0的记录
unique_videos = {}
for video in today_videos_raw:
mix_id = video.get("mix_id", "").strip()
mix_name = video.get("mix_name", "").strip()
play_vv = video.get("play_vv", 0)
# 过滤掉空的或无效的mix_id
if not mix_id or mix_id == "" or mix_id.lower() == "null":
continue
# 注意播放量为0的数据也会被保留可能是新发布的短剧
if play_vv <= 0:
logging.warning(f"⚠️ 发现播放量为0的数据: mix_name={mix_name}, play_vv={play_vv},仍会保留")
if mix_id not in unique_videos or play_vv > unique_videos[mix_id].get("play_vv", 0):
unique_videos[mix_id] = video
today_videos = list(unique_videos.values())
logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)")
# 从Ranking_storage_list中获取昨天最后一次抓取的数据
yesterday_start = datetime.combine(yesterday, datetime.min.time())
yesterday_end = datetime.combine(yesterday, datetime.max.time())
# 获取昨天的最后一次抓取数据按batch_time排序取最新的
yesterday_latest_batch = douyin_collection.find_one({
"batch_time": {
"$gte": yesterday_start,
"$lte": yesterday_end
}
}, sort=[("batch_time", -1)])
yesterday_data = {}
if yesterday_latest_batch:
yesterday_batch_time = yesterday_latest_batch.get("batch_time")
logging.info(f"📊 找到昨天最后一次抓取时间: {yesterday_batch_time}")
# 获取昨天最后一次抓取的所有数据
yesterday_videos_raw = list(douyin_collection.find({
"batch_time": yesterday_batch_time
}).sort("play_vv", -1))
# 按短剧ID去重每个短剧只保留播放量最高的一条
# 🚫 过滤掉空的或无效的mix_id
unique_yesterday_videos = {}
for video in yesterday_videos_raw:
mix_id = video.get("mix_id", "").strip()
mix_name = video.get("mix_name", "").strip()
play_vv = video.get("play_vv", 0)
# 过滤掉空的或无效的mix_id
if not mix_id or mix_id == "" or mix_id.lower() == "null":
continue
# 注意播放量为0的数据也会被保留可能是新发布的短剧
if play_vv <= 0:
logging.warning(f"⚠️ 昨天数据中发现播放量为0: mix_name={mix_name}, play_vv={play_vv},仍会保留")
if mix_id not in unique_yesterday_videos or play_vv > unique_yesterday_videos[mix_id].get("play_vv", 0):
unique_yesterday_videos[mix_id] = video
# 将昨天的数据转换为字典以短剧ID为键
for mix_id, video in unique_yesterday_videos.items():
yesterday_data[mix_id] = {
"rank": 0, # 原始数据没有排名设为0
"play_vv": video.get("play_vv", 0),
"video_id": str(video.get("_id", ""))
}
logging.info(f"📊 找到昨天的原始数据,共 {len(yesterday_data)} 个短剧(原始数据:{len(yesterday_videos_raw)} 条)")
else:
logging.info("📊 未找到昨天的原始数据,将作为首次生成")
if today_videos:
# 先计算所有视频的播放量差值
videos_with_growth = []
for video in today_videos:
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
# 计算与昨天的对比数据
play_vv_change = 0
play_vv_change_rate = 0
is_new = True
mix_id = video.get("mix_id", "")
if mix_id in yesterday_data:
is_new = False
yesterday_play_vv = yesterday_data[mix_id]["play_vv"]
# 计算播放量变化
play_vv_change = current_play_vv - yesterday_play_vv
if yesterday_play_vv > 0:
play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2)
# 创建包含增长数据的视频项
video_with_growth = {
"video": video,
"play_vv_change": play_vv_change,
"play_vv_change_rate": play_vv_change_rate,
"is_new": is_new,
"yesterday_data": yesterday_data.get(mix_id, {})
}
videos_with_growth.append(video_with_growth)
# 按播放量差值降序排序(差值越大排名越靠前)
videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True)
comprehensive_ranking = {
"date": today_str,
"type": "comprehensive",
"name": "播放量增长榜单",
"description": f"基于 {yesterday_str}{today_str} 播放量差值排序的榜单(差值越大排名越靠前)",
"comparison_date": yesterday_str,
"total_videos": len(videos_with_growth),
"data": []
}
# 获取Rankings_management集合用于补充详细信息
rankings_management_collection = db['Rankings_management']
# 生成排序后的榜单数据
rank = 1 # 使用独立的排名计数器
for item in videos_with_growth:
video = item["video"]
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
mix_name = video.get("mix_name", "").strip()
# 🚫 跳过无效数据确保mix_name不为空
# 注意播放量为0的数据也会被保留可能是新发布的短剧
if not mix_name or mix_name == "" or mix_name.lower() == "null":
self.logger.warning(f"跳过空的mix_name记录video_id: {video_id}")
continue
if current_play_vv <= 0:
self.logger.warning(f"⚠️ 榜单中发现播放量为0的记录: mix_name={mix_name}, play_vv={current_play_vv},仍会保留")
# 计算排名变化(基于昨天的排名)
rank_change = 0
if not item["is_new"] and item["yesterday_data"]:
yesterday_rank = item["yesterday_data"].get("rank", 0)
rank_change = yesterday_rank - rank # 使用当前排名计数器
# 🔍 从Rankings_management获取详细信息按mix_id查询因为管理数据库每个短剧只有一条记录
mix_id = video.get("mix_id", "").strip()
management_data = None
if mix_id:
# 直接按mix_id查询不需要按日期查询
management_data = rankings_management_collection.find_one({"mix_id": mix_id})
if management_data:
logging.info(f"📋 从 Rankings_management 获取数据: {mix_name} (mix_id: {mix_id})")
else:
logging.warning(f"⚠️ 未找到管理数据: {mix_name} (mix_id: {mix_id})")
else:
logging.warning(f"⚠️ mix_id 为空: {mix_name}")
ranking_item = {
# 🎯 核心榜单字段
"rank": rank, # 使用排名计数器
"title": mix_name,
"mix_name": mix_name,
"play_vv": current_play_vv,
"series_author": video.get("series_author", ""),
"video_id": video_id,
"video_url": video.get("video_url", ""),
"cover_image_url": video.get("cover_image_url", ""),
"playcount_str": video.get("playcount", ""),
# 📋 从Rankings_management获取的详细字段
"batch_id": management_data.get("batch_id", "") if management_data else "",
"batch_time": management_data.get("batch_time") if management_data else None,
"item_sequence": management_data.get("item_sequence", 0) if management_data else 0,
"mix_id": video.get("mix_id", ""), # 直接从原始数据获取mix_id
"playcount": management_data.get("playcount", "") if management_data else "",
"request_id": management_data.get("request_id", "") if management_data else "",
"cover_image_url_original": management_data.get("cover_image_url_original", "") if management_data else "",
"cover_upload_success": management_data.get("cover_upload_success", True) if management_data else True,
"cover_backup_urls": management_data.get("cover_backup_urls", []) if management_data else [],
"desc": management_data.get("desc", "") if management_data else "",
"updated_to_episode": management_data.get("updated_to_episode", 0) if management_data else 0,
"episode_video_ids": management_data.get("episode_video_ids", []) if management_data else [],
# 🔒 episode_details 同步逻辑:如果有 comments_summary保留评论内容但更新互动数据
"episode_details": self._sync_episode_details_with_lock(
management_data.get("episode_details", []) if management_data else [],
management_data.get("comments_summary", "") if management_data else ""
),
"data_status": management_data.get("data_status", "") if management_data else "",
"realtime_saved": management_data.get("realtime_saved", True) if management_data else True,
"created_at": management_data.get("created_at") if management_data else None,
"last_updated": management_data.get("last_updated") if management_data else None,
# 🎬 评论总结字段直接从管理数据库获取按mix_id查询
"comments_summary": management_data.get("comments_summary", "") if management_data else "",
# 🔑 分类字段直接从管理数据库获取按mix_id查询每个短剧只有一条记录
"Manufacturing_Field": management_data.get("Manufacturing_Field", "") if management_data else "",
"Copyright_field": management_data.get("Copyright_field", "") if management_data else "",
"classification_type": management_data.get("classification_type", "") if management_data else "",
"release_date": management_data.get("release_date", "") if management_data else "",
"Novel_IDs": management_data.get("Novel_IDs", []) if management_data else [],
"Anime_IDs": management_data.get("Anime_IDs", []) if management_data else [],
"Drama_IDs": management_data.get("Drama_IDs", []) if management_data else [],
# 🔒 锁定状态:直接从管理数据库获取
"field_lock_status": management_data.get("field_lock_status", {}) if management_data else {},
# 📊 时间轴对比数据(重要:包含播放量差值)
"timeline_data": {
"is_new": item["is_new"],
"rank_change": rank_change,
"play_vv_change": item["play_vv_change"],
"play_vv_change_rate": item["play_vv_change_rate"],
"yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0,
"yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0
}
}
comprehensive_ranking["data"].append(ranking_item)
rank += 1 # 递增排名计数器
# 为每次计算添加唯一的时间戳,确保数据唯一性
current_timestamp = datetime.now()
comprehensive_ranking["created_at"] = current_timestamp
comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}"
# 检查今天已有多少次计算
existing_count = rankings_collection.count_documents({
"date": today_str,
"type": "comprehensive"
})
comprehensive_ranking["calculation_sequence"] = existing_count + 1
# 总是插入新的榜单记录,保留所有历史计算数据
rankings_collection.insert_one(comprehensive_ranking)
logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)")
logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}")
# 📊 检查数据完整性统计从Rankings_management成功获取详细信息的项目数量
total_items = len(comprehensive_ranking["data"])
items_with_management_data = 0
items_with_manufacturing = 0
items_with_copyright = 0
for item in comprehensive_ranking["data"]:
# 检查是否从Rankings_management获取到了数据
if item.get("batch_id") or item.get("desc") or item.get("Manufacturing_Field") or item.get("Copyright_field"):
items_with_management_data += 1
if item.get("Manufacturing_Field"):
items_with_manufacturing += 1
if item.get("Copyright_field"):
items_with_copyright += 1
print(f"数据完整性统计:")
print(f" 总项目数: {total_items}")
print(f" 从Rankings_management获取到详细信息: {items_with_management_data}")
print(f" 包含Manufacturing_Field: {items_with_manufacturing}")
print(f" 包含Copyright_field: {items_with_copyright}")
logging.info(f"📊 数据完整性: 总{total_items}项,获取详细信息{items_with_management_data}Manufacturing_Field: {items_with_manufacturing}Copyright_field: {items_with_copyright}")
# 统计信息
new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"])
print(f"✅ 时间轴对比榜单生成成功")
print(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录")
print(f"🆕 新上榜 {new_count}")
print(f"🔄 对比基准日期: {yesterday_str}")
logging.info(f"✅ 时间轴对比榜单生成成功")
logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录")
logging.info(f"🆕 新上榜 {new_count}")
logging.info(f"🔄 对比基准日期: {yesterday_str}")
return True
else:
logging.warning("⚠️ 榜单生成失败:无今日数据")
return False
except Exception as e:
logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
return False
except Exception as e:
logging.error(f"💥 生成榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def check_and_sync_missing_fields(self):
"""实时检查并同步当天缺失字段"""
try:
from database import db
# 只检查当天的数据
today = date.today()
today_str = today.strftime('%Y-%m-%d')
# 首先检查 Rankings_management 是否有当天的数据
rankings_management_collection = db['Rankings_management']
management_count = rankings_management_collection.count_documents({})
if management_count == 0:
# Rankings_management 没有数据,说明还没有抓取,直接返回
return
rankings_collection = db['Ranking_storage']
key_fields = ['Manufacturing_Field', 'Copyright_field', 'desc', 'series_author']
# 检查今天是否有缺失字段的数据
missing_conditions = []
for field in key_fields:
missing_conditions.extend([
{field: {"$exists": False}},
{field: None},
{field: ""}
])
today_missing_count = rankings_collection.count_documents({
"date": today_str,
"$or": missing_conditions
})
# 如果今天没有缺失数据,静默返回
if today_missing_count == 0:
return
logging.info(f"🔍 检测到今天有 {today_missing_count} 条缺失字段Rankings_management有 {management_count} 条数据,开始实时同步...")
# 只处理当天的数据
dates_to_check = [today_str]
total_missing = 0
total_synced = 0
for check_date in dates_to_check:
# 查询该日期缺失字段的数据
rankings_collection = db['Ranking_storage']
# 检查多个关键字段(包括新增的分类字段)
key_fields = ['Manufacturing_Field', 'Copyright_field', 'desc', 'series_author', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']
missing_conditions = []
for field in key_fields:
missing_conditions.extend([
{field: {"$exists": False}},
{field: None},
{field: ""}
])
missing_query = {
"date": check_date,
"$or": missing_conditions
}
missing_count = rankings_collection.count_documents(missing_query)
# 详细统计每个字段的缺失情况
field_stats = {}
total_items = rankings_collection.count_documents({"date": check_date})
for field in key_fields:
missing_field_count = rankings_collection.count_documents({
"date": check_date,
"$or": [
{field: {"$exists": False}},
{field: None},
{field: ""}
]
})
field_stats[field] = {
"missing": missing_field_count,
"completion_rate": ((total_items - missing_field_count) / total_items * 100) if total_items > 0 else 0
}
if missing_count > 0:
logging.info(f"📅 今日({check_date}): 发现 {missing_count} 条记录缺失字段(总计 {total_items} 条)")
# 输出详细的字段统计
for field, stats in field_stats.items():
if stats["missing"] > 0:
logging.info(f" - {field}: 缺失 {stats['missing']} 条 ({stats['completion_rate']:.1f}% 完整)")
total_missing += missing_count
# 尝试同步
try:
from routers.rank_api_routes import sync_ranking_storage_fields
# 使用改进的重试机制
sync_result = sync_ranking_storage_fields(
target_date=check_date,
force_update=False,
max_retries=2, # 定期检查时重试2次
retry_delay=15 # 15秒重试间隔
)
if sync_result.get("success", False):
stats = sync_result.get("stats", {})
synced = stats.get("updated_items", 0)
retry_count = stats.get("retry_count", 0)
pending_final = stats.get("pending_items_final", 0)
total_synced += synced
if synced > 0:
logging.info(f"✅ 今日({check_date}): 成功同步 {synced} 条记录")
if retry_count > 0:
logging.info(f"🔄 今日({check_date}): 使用了 {retry_count} 次重试")
if pending_final > 0:
logging.warning(f"⚠️ 今日({check_date}): {pending_final} 条记录在 Rankings_management 中仍未找到")
else:
logging.warning(f"⚠️ 今日({check_date}): 同步失败 - {sync_result.get('message', '')}")
except Exception as sync_error:
logging.error(f"💥 今日({check_date}): 同步过程出错 - {sync_error}")
else:
if total_items > 0:
logging.info(f"📅 {check_date}: 所有字段完整(总计 {total_items} 条记录)")
# 显示完整性统计
for field, stats in field_stats.items():
logging.info(f" - {field}: {stats['completion_rate']:.1f}% 完整")
else:
logging.info(f"📅 {check_date}: 无数据")
if total_missing > 0:
logging.info(f"🔍 当天同步完成:发现 {total_missing} 条缺失记录,成功同步 {total_synced}")
print(f"🔍 当天字段同步:发现 {total_missing} 条缺失,同步 {total_synced}")
else:
# 当天没有缺失数据时,不输出日志(静默模式)
pass
except Exception as e:
logging.error(f"💥 检查缺失字段时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def setup_schedule(self):
"""设置定时任务"""
# 每小时的整点执行抖音播放量抓取
schedule.every().hour.at(":00").do(self.run_douyin_scraper)
# 每1分钟检查一次缺失字段并尝试同步实时同步
schedule.every(1).minutes.do(self.check_and_sync_missing_fields)
logging.info(f"⏰ 定时器已设置:每小时整点执行抖音播放量抓取")
logging.info(f"⏰ 定时器已设置每1分钟检查缺失字段并同步实时模式")
def show_next_run(self):
"""显示下次执行时间"""
jobs = schedule.get_jobs()
if jobs:
next_run = jobs[0].next_run
current_time = datetime.now()
wait_seconds = (next_run - current_time).total_seconds()
wait_minutes = int(wait_seconds // 60)
wait_hours = int(wait_minutes // 60)
remaining_minutes = wait_minutes % 60
print(f"💡 定时器运行中,下次执行:{next_run.strftime('%Y-%m-%d %H:%M:%S')} (还有{wait_hours}h{remaining_minutes}m)")
print(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)")
logging.info(f"💡 定时器运行中,下次执行:{next_run.strftime('%Y-%m-%d %H:%M:%S')} (还有{wait_hours}h{remaining_minutes}m)")
logging.info(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)")
def run_once(self):
"""立即执行一次"""
logging.info("🔧 立即执行模式...")
self.run_douyin_scraper()
def run_test(self):
"""测试模式 - 立即执行一次"""
logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...")
self.run_douyin_scraper()
def run_ranking_only(self):
"""仅生成榜单(不抓取数据)"""
logging.info("📊 仅生成榜单模式...")
self.generate_daily_rankings()
def start_scheduler(self):
"""启动定时器"""
self.is_running = True
last_status_time = int(time.time()) # 设置为当前时间1分钟后开始显示状态
print("🚀 抖音播放量自动抓取定时器已启动")
print("⏰ 执行时间:每小时整点执行抖音播放量抓取")
print("⏹️ 按 Ctrl+C 停止定时器")
logging.info("🚀 抖音播放量自动抓取定时器已启动")
logging.info(f"⏰ 执行时间:每小时整点执行抖音播放量抓取")
logging.info("⏹️ 按 Ctrl+C 停止定时器")
# 启动时显示一次下次执行时间
self.show_next_run()
try:
while self.is_running:
schedule.run_pending()
time.sleep(1)
# 每1分钟显示一次状态
current_time = int(time.time())
if current_time - last_status_time >= 60: # 60秒 = 1分钟
self.show_next_run()
last_status_time = current_time
except KeyboardInterrupt:
print("\n⏹️ 定时器已停止")
logging.info("\n⏹️ 定时器已停止")
self.is_running = False
def main():
"""主函数"""
try:
parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器')
parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次')
parser.add_argument('--once', action='store_true', help='立即执行一次并退出')
parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)')
args = parser.parse_args()
# 设置日志配置 - 只在定时器模式下启用静默模式
quiet_mode = not (args.test or args.once or args.ranking_only)
setup_logging(quiet_mode=quiet_mode)
print("正在初始化定时器...")
scheduler = DouyinAutoScheduler()
if args.test:
scheduler._is_timer_mode = False
print("执行测试模式...")
scheduler.run_test()
elif args.once:
scheduler._is_timer_mode = False
print("执行单次模式...")
scheduler.run_once()
elif args.ranking_only:
scheduler._is_timer_mode = False
print("执行榜单生成模式...")
scheduler.run_ranking_only()
else:
scheduler._is_timer_mode = True
print("启动定时器模式...")
# 显示定时器信息使用print确保能看到
from datetime import datetime
current_time = datetime.now()
print(f"🕐 当前时间:{current_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"⏰ 执行规则:每小时整点执行抖音播放量抓取")
# 计算下次执行时间
next_hour = current_time.replace(minute=0, second=0, microsecond=0)
if current_time.minute > 0 or current_time.second > 0:
next_hour = next_hour.replace(hour=next_hour.hour + 1)
if next_hour.hour >= 24:
from datetime import timedelta
next_hour = next_hour.replace(hour=0) + timedelta(days=1)
wait_seconds = (next_hour - current_time).total_seconds()
wait_minutes = int(wait_seconds // 60)
print(f"⏰ 下次执行时间:{next_hour.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)")
print("💡 定时器正在等待中,将在整点自动执行任务...")
print("⏹️ 按 Ctrl+C 停止定时器")
scheduler.setup_schedule()
scheduler.start_scheduler()
print("程序执行完成")
except Exception as e:
print(f"程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
main()