844 lines
43 KiB
Python
844 lines
43 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
抖音播放量自动抓取定时器 - 跨平台版本
|
||
|
||
功能:
|
||
- 每晚自动执行抖音播放量抓取任务
|
||
- 数据抓取完成后自动生类榜单
|
||
- 支持Windows、macOS、Linux
|
||
- 自动保存数据到MongoDB
|
||
|
||
使用方法:
|
||
- 正常模式:python Timer_worker.py(启动定时器)
|
||
- 测试模式:python Timer_worker.py --test(立即执行一次)
|
||
- 单次执行:python Timer_worker.py --once(立即执行一次并退出)
|
||
- 仅生成榜单:python Timer_worker.py --ranking-only(仅生成榜单,不抓取数据)
|
||
"""
|
||
|
||
import schedule
|
||
import time
|
||
import sys
|
||
import os
|
||
import logging
|
||
import argparse
|
||
from datetime import datetime, date, timedelta
|
||
import config
|
||
|
||
# 添加项目路径到 Python 路径
|
||
sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings'))
|
||
from handlers.Rankings.rank_data_scraper import DouyinPlayVVScraper
|
||
|
||
|
||
|
||
# 配置日志的函数
|
||
def setup_timer_environment():
|
||
"""设置定时器相关的环境变量"""
|
||
config.apply_timer_environment()
|
||
for key, value in config.TIMER_ENV_CONFIG.items():
|
||
logging.info(f"设置环境变量: {key}={value}")
|
||
|
||
def setup_logging(quiet_mode=False):
|
||
"""设置日志配置"""
|
||
# 确保logs目录存在
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
logs_dir = os.path.join(script_dir, 'handlers', 'Rankings', 'logs')
|
||
os.makedirs(logs_dir, exist_ok=True)
|
||
|
||
# 在安静模式下,只记录WARNING及以上级别的日志到控制台
|
||
console_level = logging.WARNING if quiet_mode else logging.INFO
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO, # 文件日志仍然记录所有INFO级别
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler(os.path.join(logs_dir, 'scheduler.log'), encoding='utf-8'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
|
||
# 如果是安静模式,调整控制台处理器的级别
|
||
if quiet_mode:
|
||
for handler in logging.getLogger().handlers:
|
||
if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler):
|
||
handler.setLevel(console_level)
|
||
|
||
class DouyinAutoScheduler:
|
||
def __init__(self):
|
||
self.is_running = False
|
||
# 创建logger实例
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
def _normalize_play_vv(self, play_vv):
|
||
"""标准化播放量数据类型,将字符串转换为数字"""
|
||
if isinstance(play_vv, str):
|
||
try:
|
||
return int(play_vv.replace(',', '').replace('万', '0000').replace('亿', '00000000'))
|
||
except:
|
||
return 0
|
||
elif not isinstance(play_vv, (int, float)):
|
||
return 0
|
||
return play_vv
|
||
|
||
def check_browser_login_status(self):
|
||
"""检查浏览器登录状态,如果没有登录则提示用户登录"""
|
||
try:
|
||
import os
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile_timer', 'douyin_persistent')
|
||
|
||
|
||
# 检查配置文件是否为空(可能未登录)
|
||
import glob
|
||
profile_files = glob.glob(os.path.join(profile_dir, "*"))
|
||
if len(profile_files) < 5: # 如果文件太少,可能未登录
|
||
print("⚠️ 检测到定时器浏览器可能未登录")
|
||
print(" 请在浏览器中完成抖音登录,并导航到【我的】→【收藏】→【合集】页面")
|
||
print(" 完成后按回车键继续...")
|
||
input()
|
||
else:
|
||
print("✅ 定时器浏览器已配置,继续执行...")
|
||
|
||
except Exception as e:
|
||
logging.warning(f"检查浏览器登录状态时出错: {e}")
|
||
print("⚠️ 检查浏览器状态失败,请确保浏览器已正确配置")
|
||
print(" 完成后按回车键继续...")
|
||
input()
|
||
|
||
def _cleanup_chrome_processes(self):
|
||
"""清理可能占用配置文件的Chrome进程"""
|
||
try:
|
||
import psutil
|
||
import os
|
||
|
||
# 获取当前配置文件路径
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile_timer', 'douyin_persistent')
|
||
|
||
# 查找使用该配置文件的Chrome进程
|
||
killed_processes = []
|
||
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||
try:
|
||
if proc.info['name'] and 'chrome' in proc.info['name'].lower():
|
||
cmdline = proc.info['cmdline']
|
||
if cmdline and any(profile_dir in arg for arg in cmdline):
|
||
proc.terminate()
|
||
killed_processes.append(proc.info['pid'])
|
||
logging.info(f'终止占用配置文件的Chrome进程: PID {proc.info["pid"]}')
|
||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||
continue
|
||
|
||
# 等待进程终止
|
||
if killed_processes:
|
||
import time
|
||
time.sleep(2)
|
||
|
||
return len(killed_processes) > 0
|
||
|
||
except ImportError:
|
||
# 如果没有psutil,跳过清理以避免影响其他脚本实例
|
||
logging.warning('psutil 不可用,跳过进程清理(避免全局终止 Chrome)')
|
||
return False
|
||
except Exception as e:
|
||
logging.warning(f'清理Chrome进程时出错: {e}')
|
||
return False
|
||
|
||
def run_douyin_scraper(self):
|
||
"""执行抖音播放量抓取任务"""
|
||
try:
|
||
logging.warning("🚀 开始执行抖音播放量抓取任务...")
|
||
|
||
# 设置环境变量,确保定时器模式和自动模式
|
||
setup_timer_environment()
|
||
|
||
# 直接创建并运行 DouyinPlayVVScraper 实例
|
||
scraper = DouyinPlayVVScraper(
|
||
start_url="https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation",
|
||
auto_continue=True,
|
||
duration_s=60 # 增加到60秒,给更多时间收集数据
|
||
)
|
||
|
||
print("开始执行抓取任务...")
|
||
logging.info("📁 开始执行抓取任务...")
|
||
scraper.run()
|
||
|
||
print("抖音播放量抓取任务执行成功")
|
||
logging.info("✅ 抖音播放量抓取任务执行成功")
|
||
|
||
# 数据抓取完成后,自动生成当日榜单
|
||
self.generate_daily_rankings()
|
||
|
||
# 任务完成后立即显示下次执行时间
|
||
print("🎯 任务完成,准备下次执行...")
|
||
self.show_next_run()
|
||
print("💡 定时器正在等待中,将在整点自动执行任务...")
|
||
|
||
logging.info("🎯 任务完成,准备下次执行...")
|
||
logging.info("💡 定时器正在等待中,将在整点自动执行任务...")
|
||
|
||
except Exception as e:
|
||
logging.error(f"💥 执行任务时发生异常: {e}")
|
||
import traceback
|
||
logging.error(f"详细错误信息: {traceback.format_exc()}")
|
||
|
||
def generate_daily_rankings(self):
|
||
"""生成每日榜单数据(基于时间轴对比)"""
|
||
try:
|
||
from database import db
|
||
from datetime import timedelta
|
||
|
||
# 获取集合
|
||
douyin_collection = db['Ranking_storage_list'] # 使用定时器抓取的数据
|
||
rankings_collection = db['Ranking_storage']
|
||
|
||
today = date.today()
|
||
yesterday = today - timedelta(days=1)
|
||
today_str = today.strftime('%Y-%m-%d')
|
||
yesterday_str = yesterday.strftime('%Y-%m-%d')
|
||
|
||
logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str})...")
|
||
|
||
# 删除当天已有的榜单数据
|
||
rankings_collection.delete_many({"date": today_str})
|
||
print(f"🗑️ 已清理 {today_str} 的旧榜单数据")
|
||
logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据")
|
||
|
||
# 获取今天和昨天的榜单数据进行对比
|
||
try:
|
||
print("🔄 正在生成时间轴对比榜单...")
|
||
logging.info("🔄 正在生成时间轴对比榜单...")
|
||
|
||
# 获取最新批次的数据
|
||
latest_batch = douyin_collection.find_one(sort=[("batch_time", -1)])
|
||
if not latest_batch:
|
||
logging.warning("⚠️ 未找到任何数据")
|
||
return False
|
||
|
||
latest_batch_time = latest_batch.get("batch_time")
|
||
logging.info(f"📊 找到最新批次时间: {latest_batch_time}")
|
||
|
||
# 只获取最新批次的数据
|
||
today_videos_raw = list(douyin_collection.find({"batch_time": latest_batch_time}).sort("play_vv", -1))
|
||
logging.info(f"📊 最新批次数据数量: {len(today_videos_raw)}")
|
||
|
||
# 按短剧ID去重,每个短剧只保留播放量最高的一条
|
||
# 🚫 过滤掉空的或无效的mix_id和播放量为0的记录
|
||
unique_videos = {}
|
||
for video in today_videos_raw:
|
||
mix_id = video.get("mix_id", "").strip()
|
||
mix_name = video.get("mix_name", "").strip()
|
||
play_vv = video.get("play_vv", 0)
|
||
|
||
# 过滤掉空的或无效的mix_id
|
||
if not mix_id or mix_id == "" or mix_id.lower() == "null":
|
||
continue
|
||
|
||
# 注意:播放量为0的数据也会被保留,可能是新发布的短剧
|
||
if play_vv <= 0:
|
||
logging.warning(f"⚠️ 发现播放量为0的数据: mix_name={mix_name}, play_vv={play_vv},仍会保留")
|
||
|
||
if mix_id not in unique_videos or play_vv > unique_videos[mix_id].get("play_vv", 0):
|
||
unique_videos[mix_id] = video
|
||
|
||
today_videos = list(unique_videos.values())
|
||
|
||
logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)")
|
||
|
||
# 从Ranking_storage_list中获取昨天最后一次抓取的数据
|
||
yesterday_start = datetime.combine(yesterday, datetime.min.time())
|
||
yesterday_end = datetime.combine(yesterday, datetime.max.time())
|
||
|
||
# 获取昨天的最后一次抓取数据(按batch_time排序取最新的)
|
||
yesterday_latest_batch = douyin_collection.find_one({
|
||
"batch_time": {
|
||
"$gte": yesterday_start,
|
||
"$lte": yesterday_end
|
||
}
|
||
}, sort=[("batch_time", -1)])
|
||
|
||
yesterday_data = {}
|
||
if yesterday_latest_batch:
|
||
yesterday_batch_time = yesterday_latest_batch.get("batch_time")
|
||
logging.info(f"📊 找到昨天最后一次抓取时间: {yesterday_batch_time}")
|
||
|
||
# 获取昨天最后一次抓取的所有数据
|
||
yesterday_videos_raw = list(douyin_collection.find({
|
||
"batch_time": yesterday_batch_time
|
||
}).sort("play_vv", -1))
|
||
|
||
# 按短剧ID去重,每个短剧只保留播放量最高的一条
|
||
# 🚫 过滤掉空的或无效的mix_id
|
||
unique_yesterday_videos = {}
|
||
for video in yesterday_videos_raw:
|
||
mix_id = video.get("mix_id", "").strip()
|
||
mix_name = video.get("mix_name", "").strip()
|
||
play_vv = video.get("play_vv", 0)
|
||
|
||
# 过滤掉空的或无效的mix_id
|
||
if not mix_id or mix_id == "" or mix_id.lower() == "null":
|
||
continue
|
||
|
||
# 注意:播放量为0的数据也会被保留,可能是新发布的短剧
|
||
if play_vv <= 0:
|
||
logging.warning(f"⚠️ 昨天数据中发现播放量为0: mix_name={mix_name}, play_vv={play_vv},仍会保留")
|
||
|
||
if mix_id not in unique_yesterday_videos or play_vv > unique_yesterday_videos[mix_id].get("play_vv", 0):
|
||
unique_yesterday_videos[mix_id] = video
|
||
|
||
# 将昨天的数据转换为字典,以短剧ID为键
|
||
for mix_id, video in unique_yesterday_videos.items():
|
||
yesterday_data[mix_id] = {
|
||
"rank": 0, # 原始数据没有排名,设为0
|
||
"play_vv": video.get("play_vv", 0),
|
||
"video_id": str(video.get("_id", ""))
|
||
}
|
||
|
||
logging.info(f"📊 找到昨天的原始数据,共 {len(yesterday_data)} 个短剧(原始数据:{len(yesterday_videos_raw)} 条)")
|
||
else:
|
||
logging.info("📊 未找到昨天的原始数据,将作为首次生成")
|
||
|
||
if today_videos:
|
||
# 先计算所有视频的播放量差值
|
||
videos_with_growth = []
|
||
for video in today_videos:
|
||
video_id = str(video.get("_id", ""))
|
||
current_play_vv = video.get("play_vv", 0)
|
||
|
||
# 计算与昨天的对比数据
|
||
play_vv_change = 0
|
||
play_vv_change_rate = 0
|
||
is_new = True
|
||
|
||
mix_id = video.get("mix_id", "")
|
||
if mix_id in yesterday_data:
|
||
is_new = False
|
||
yesterday_play_vv = yesterday_data[mix_id]["play_vv"]
|
||
|
||
# 计算播放量变化
|
||
play_vv_change = current_play_vv - yesterday_play_vv
|
||
if yesterday_play_vv > 0:
|
||
play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2)
|
||
|
||
# 创建包含增长数据的视频项
|
||
video_with_growth = {
|
||
"video": video,
|
||
"play_vv_change": play_vv_change,
|
||
"play_vv_change_rate": play_vv_change_rate,
|
||
"is_new": is_new,
|
||
"yesterday_data": yesterday_data.get(mix_id, {})
|
||
}
|
||
videos_with_growth.append(video_with_growth)
|
||
|
||
# 按播放量差值降序排序(差值越大排名越靠前)
|
||
videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True)
|
||
|
||
comprehensive_ranking = {
|
||
"date": today_str,
|
||
"type": "comprehensive",
|
||
"name": "播放量增长榜单",
|
||
"description": f"基于 {yesterday_str} 和 {today_str} 播放量差值排序的榜单(差值越大排名越靠前)",
|
||
"comparison_date": yesterday_str,
|
||
"total_videos": len(videos_with_growth),
|
||
"data": []
|
||
}
|
||
|
||
# 获取Rankings_management集合用于补充详细信息
|
||
rankings_management_collection = db['Rankings_management']
|
||
|
||
# 生成排序后的榜单数据
|
||
rank = 1 # 使用独立的排名计数器
|
||
for item in videos_with_growth:
|
||
video = item["video"]
|
||
video_id = str(video.get("_id", ""))
|
||
current_play_vv = video.get("play_vv", 0)
|
||
mix_name = video.get("mix_name", "").strip()
|
||
|
||
# 🚫 跳过无效数据:确保mix_name不为空
|
||
# 注意:播放量为0的数据也会被保留,可能是新发布的短剧
|
||
if not mix_name or mix_name == "" or mix_name.lower() == "null":
|
||
self.logger.warning(f"跳过空的mix_name记录,video_id: {video_id}")
|
||
continue
|
||
|
||
if current_play_vv <= 0:
|
||
self.logger.warning(f"⚠️ 榜单中发现播放量为0的记录: mix_name={mix_name}, play_vv={current_play_vv},仍会保留")
|
||
|
||
# 计算排名变化(基于昨天的排名)
|
||
rank_change = 0
|
||
if not item["is_new"] and item["yesterday_data"]:
|
||
yesterday_rank = item["yesterday_data"].get("rank", 0)
|
||
rank_change = yesterday_rank - rank # 使用当前排名计数器
|
||
|
||
# 🔍 从Rankings_management获取详细信息(按日期和mix_name查询)
|
||
today_str = datetime.now().strftime('%Y-%m-%d')
|
||
management_data = rankings_management_collection.find_one({
|
||
"mix_name": mix_name,
|
||
"$or": [
|
||
{"created_at": {"$gte": datetime.strptime(today_str, '%Y-%m-%d'),
|
||
"$lt": datetime.strptime(today_str, '%Y-%m-%d') + timedelta(days=1)}},
|
||
{"last_updated": {"$gte": datetime.strptime(today_str, '%Y-%m-%d'),
|
||
"$lt": datetime.strptime(today_str, '%Y-%m-%d') + timedelta(days=1)}}
|
||
]
|
||
})
|
||
|
||
# 🔑 如果今天没有数据,查询昨天的 Rankings_management(仅用于获取分类字段和锁定状态)
|
||
classification_data = None
|
||
if not management_data:
|
||
# 查询昨天的 Rankings_management
|
||
yesterday_start = datetime.strptime(yesterday_str, '%Y-%m-%d')
|
||
yesterday_end = yesterday_start + timedelta(days=1)
|
||
classification_data = rankings_management_collection.find_one({
|
||
"mix_name": mix_name,
|
||
"$or": [
|
||
{"created_at": {"$gte": yesterday_start, "$lt": yesterday_end}},
|
||
{"last_updated": {"$gte": yesterday_start, "$lt": yesterday_end}}
|
||
]
|
||
})
|
||
if classification_data:
|
||
novel_ids = classification_data.get('Novel_IDs', [])
|
||
anime_ids = classification_data.get('Anime_IDs', [])
|
||
drama_ids = classification_data.get('Drama_IDs', [])
|
||
logging.info(f"📋 今天没有数据,从昨天的 Rankings_management 获取分类: {mix_name}")
|
||
logging.info(f" - Novel_IDs: {novel_ids}")
|
||
logging.info(f" - Anime_IDs: {anime_ids}")
|
||
logging.info(f" - Drama_IDs: {drama_ids}")
|
||
logging.info(f" - last_updated: {classification_data.get('last_updated')}")
|
||
else:
|
||
logging.warning(f"⚠️ 今天和昨天都没有数据: {mix_name}")
|
||
|
||
ranking_item = {
|
||
# 🎯 核心榜单字段
|
||
"rank": rank, # 使用排名计数器
|
||
"title": mix_name,
|
||
"mix_name": mix_name,
|
||
"play_vv": current_play_vv,
|
||
"series_author": video.get("series_author", ""),
|
||
"video_id": video_id,
|
||
"video_url": video.get("video_url", ""),
|
||
"cover_image_url": video.get("cover_image_url", ""),
|
||
"playcount_str": video.get("playcount", ""),
|
||
|
||
# 📋 从Rankings_management获取的详细字段
|
||
"batch_id": management_data.get("batch_id", "") if management_data else "",
|
||
"batch_time": management_data.get("batch_time") if management_data else None,
|
||
"item_sequence": management_data.get("item_sequence", 0) if management_data else 0,
|
||
"mix_id": video.get("mix_id", ""), # 直接从原始数据获取mix_id
|
||
"playcount": management_data.get("playcount", "") if management_data else "",
|
||
"request_id": management_data.get("request_id", "") if management_data else "",
|
||
"cover_image_url_original": management_data.get("cover_image_url_original", "") if management_data else "",
|
||
"cover_upload_success": management_data.get("cover_upload_success", True) if management_data else True,
|
||
"cover_backup_urls": management_data.get("cover_backup_urls", []) if management_data else [],
|
||
"desc": management_data.get("desc", "") if management_data else "",
|
||
"updated_to_episode": management_data.get("updated_to_episode", 0) if management_data else 0,
|
||
"episode_video_ids": management_data.get("episode_video_ids", []) if management_data else [],
|
||
"episode_details": management_data.get("episode_details", []) if management_data else [],
|
||
"data_status": management_data.get("data_status", "") if management_data else "",
|
||
"realtime_saved": management_data.get("realtime_saved", True) if management_data else True,
|
||
"created_at": management_data.get("created_at") if management_data else None,
|
||
"last_updated": management_data.get("last_updated") if management_data else None,
|
||
# 🎬 评论总结字段
|
||
"comments_summary": management_data.get("comments_summary", "") if management_data else "",
|
||
|
||
# 🔑 分类字段:区分今天数据和历史数据
|
||
# - 如果今天有数据:从今天的数据获取所有字段
|
||
# - 如果今天没有数据:只从历史记录获取分类字段和锁定状态,其他字段为空
|
||
# 注意:使用 .get() 的第二个参数确保即使字段不存在也会返回空字符串
|
||
"Manufacturing_Field": (management_data.get("Manufacturing_Field", "") if management_data else "") or "",
|
||
"Copyright_field": (management_data.get("Copyright_field", "") if management_data else "") or "",
|
||
"classification_type": (management_data.get("classification_type", "") if management_data else "") or "", # 新增:类型/元素(确保字段存在)
|
||
"release_date": (management_data.get("release_date", "") if management_data else "") or "", # 新增:上线日期(确保字段存在)
|
||
"Novel_IDs": (
|
||
management_data.get("Novel_IDs", []) if management_data
|
||
else (classification_data.get("Novel_IDs", []) if classification_data else [])
|
||
),
|
||
"Anime_IDs": (
|
||
management_data.get("Anime_IDs", []) if management_data
|
||
else (classification_data.get("Anime_IDs", []) if classification_data else [])
|
||
),
|
||
"Drama_IDs": (
|
||
management_data.get("Drama_IDs", []) if management_data
|
||
else (classification_data.get("Drama_IDs", []) if classification_data else [])
|
||
),
|
||
|
||
# 🔒 锁定状态:同样区分今天数据和历史数据
|
||
"field_lock_status": (
|
||
management_data.get("field_lock_status", {}) if management_data
|
||
else (classification_data.get("field_lock_status", {}) if classification_data else {})
|
||
),
|
||
|
||
# 📊 时间轴对比数据(重要:包含播放量差值)
|
||
"timeline_data": {
|
||
"is_new": item["is_new"],
|
||
"rank_change": rank_change,
|
||
"play_vv_change": item["play_vv_change"],
|
||
"play_vv_change_rate": item["play_vv_change_rate"],
|
||
"yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0,
|
||
"yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0
|
||
}
|
||
}
|
||
|
||
comprehensive_ranking["data"].append(ranking_item)
|
||
rank += 1 # 递增排名计数器
|
||
|
||
# 为每次计算添加唯一的时间戳,确保数据唯一性
|
||
current_timestamp = datetime.now()
|
||
comprehensive_ranking["created_at"] = current_timestamp
|
||
comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}"
|
||
|
||
# 检查今天已有多少次计算
|
||
existing_count = rankings_collection.count_documents({
|
||
"date": today_str,
|
||
"type": "comprehensive"
|
||
})
|
||
|
||
comprehensive_ranking["calculation_sequence"] = existing_count + 1
|
||
|
||
# 总是插入新的榜单记录,保留所有历史计算数据
|
||
rankings_collection.insert_one(comprehensive_ranking)
|
||
logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)")
|
||
logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}")
|
||
|
||
# 📊 检查数据完整性:统计从Rankings_management成功获取详细信息的项目数量
|
||
total_items = len(comprehensive_ranking["data"])
|
||
items_with_management_data = 0
|
||
items_with_manufacturing = 0
|
||
items_with_copyright = 0
|
||
|
||
for item in comprehensive_ranking["data"]:
|
||
# 检查是否从Rankings_management获取到了数据
|
||
if item.get("batch_id") or item.get("desc") or item.get("Manufacturing_Field") or item.get("Copyright_field"):
|
||
items_with_management_data += 1
|
||
if item.get("Manufacturing_Field"):
|
||
items_with_manufacturing += 1
|
||
if item.get("Copyright_field"):
|
||
items_with_copyright += 1
|
||
|
||
print(f"数据完整性统计:")
|
||
print(f" 总项目数: {total_items}")
|
||
print(f" 从Rankings_management获取到详细信息: {items_with_management_data}")
|
||
print(f" 包含Manufacturing_Field: {items_with_manufacturing}")
|
||
print(f" 包含Copyright_field: {items_with_copyright}")
|
||
|
||
logging.info(f"📊 数据完整性: 总{total_items}项,获取详细信息{items_with_management_data}项,Manufacturing_Field: {items_with_manufacturing},Copyright_field: {items_with_copyright}")
|
||
|
||
# 统计信息
|
||
new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"])
|
||
print(f"✅ 时间轴对比榜单生成成功")
|
||
print(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录")
|
||
print(f"🆕 新上榜 {new_count} 条")
|
||
print(f"🔄 对比基准日期: {yesterday_str}")
|
||
|
||
logging.info(f"✅ 时间轴对比榜单生成成功")
|
||
logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录")
|
||
logging.info(f"🆕 新上榜 {new_count} 条")
|
||
logging.info(f"🔄 对比基准日期: {yesterday_str}")
|
||
|
||
return True
|
||
else:
|
||
logging.warning("⚠️ 榜单生成失败:无今日数据")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}")
|
||
import traceback
|
||
logging.error(f"详细错误信息: {traceback.format_exc()}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logging.error(f"💥 生成榜单时发生异常: {e}")
|
||
import traceback
|
||
logging.error(f"详细错误信息: {traceback.format_exc()}")
|
||
|
||
def check_and_sync_missing_fields(self):
|
||
"""实时检查并同步当天缺失字段"""
|
||
try:
|
||
from database import db
|
||
|
||
# 只检查当天的数据
|
||
today = date.today()
|
||
today_str = today.strftime('%Y-%m-%d')
|
||
|
||
# 首先检查 Rankings_management 是否有当天的数据
|
||
rankings_management_collection = db['Rankings_management']
|
||
management_count = rankings_management_collection.count_documents({})
|
||
|
||
if management_count == 0:
|
||
# Rankings_management 没有数据,说明还没有抓取,直接返回
|
||
return
|
||
|
||
rankings_collection = db['Ranking_storage']
|
||
key_fields = ['Manufacturing_Field', 'Copyright_field', 'desc', 'series_author']
|
||
|
||
# 检查今天是否有缺失字段的数据
|
||
missing_conditions = []
|
||
for field in key_fields:
|
||
missing_conditions.extend([
|
||
{field: {"$exists": False}},
|
||
{field: None},
|
||
{field: ""}
|
||
])
|
||
|
||
today_missing_count = rankings_collection.count_documents({
|
||
"date": today_str,
|
||
"$or": missing_conditions
|
||
})
|
||
|
||
# 如果今天没有缺失数据,静默返回
|
||
if today_missing_count == 0:
|
||
return
|
||
|
||
logging.info(f"🔍 检测到今天有 {today_missing_count} 条缺失字段,Rankings_management有 {management_count} 条数据,开始实时同步...")
|
||
|
||
# 只处理当天的数据
|
||
dates_to_check = [today_str]
|
||
|
||
total_missing = 0
|
||
total_synced = 0
|
||
|
||
for check_date in dates_to_check:
|
||
# 查询该日期缺失字段的数据
|
||
rankings_collection = db['Ranking_storage']
|
||
|
||
# 检查多个关键字段(包括新增的分类字段)
|
||
key_fields = ['Manufacturing_Field', 'Copyright_field', 'desc', 'series_author', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']
|
||
missing_conditions = []
|
||
|
||
for field in key_fields:
|
||
missing_conditions.extend([
|
||
{field: {"$exists": False}},
|
||
{field: None},
|
||
{field: ""}
|
||
])
|
||
|
||
missing_query = {
|
||
"date": check_date,
|
||
"$or": missing_conditions
|
||
}
|
||
|
||
missing_count = rankings_collection.count_documents(missing_query)
|
||
|
||
# 详细统计每个字段的缺失情况
|
||
field_stats = {}
|
||
total_items = rankings_collection.count_documents({"date": check_date})
|
||
|
||
for field in key_fields:
|
||
missing_field_count = rankings_collection.count_documents({
|
||
"date": check_date,
|
||
"$or": [
|
||
{field: {"$exists": False}},
|
||
{field: None},
|
||
{field: ""}
|
||
]
|
||
})
|
||
field_stats[field] = {
|
||
"missing": missing_field_count,
|
||
"completion_rate": ((total_items - missing_field_count) / total_items * 100) if total_items > 0 else 0
|
||
}
|
||
|
||
if missing_count > 0:
|
||
logging.info(f"📅 今日({check_date}): 发现 {missing_count} 条记录缺失字段(总计 {total_items} 条)")
|
||
|
||
# 输出详细的字段统计
|
||
for field, stats in field_stats.items():
|
||
if stats["missing"] > 0:
|
||
logging.info(f" - {field}: 缺失 {stats['missing']} 条 ({stats['completion_rate']:.1f}% 完整)")
|
||
|
||
total_missing += missing_count
|
||
|
||
# 尝试同步
|
||
try:
|
||
from routers.rank_api_routes import sync_ranking_storage_fields
|
||
|
||
# 使用改进的重试机制
|
||
sync_result = sync_ranking_storage_fields(
|
||
target_date=check_date,
|
||
force_update=False,
|
||
max_retries=2, # 定期检查时重试2次
|
||
retry_delay=15 # 15秒重试间隔
|
||
)
|
||
|
||
if sync_result.get("success", False):
|
||
stats = sync_result.get("stats", {})
|
||
synced = stats.get("updated_items", 0)
|
||
retry_count = stats.get("retry_count", 0)
|
||
pending_final = stats.get("pending_items_final", 0)
|
||
|
||
total_synced += synced
|
||
if synced > 0:
|
||
logging.info(f"✅ 今日({check_date}): 成功同步 {synced} 条记录")
|
||
|
||
if retry_count > 0:
|
||
logging.info(f"🔄 今日({check_date}): 使用了 {retry_count} 次重试")
|
||
|
||
if pending_final > 0:
|
||
logging.warning(f"⚠️ 今日({check_date}): {pending_final} 条记录在 Rankings_management 中仍未找到")
|
||
else:
|
||
logging.warning(f"⚠️ 今日({check_date}): 同步失败 - {sync_result.get('message', '')}")
|
||
|
||
except Exception as sync_error:
|
||
logging.error(f"💥 今日({check_date}): 同步过程出错 - {sync_error}")
|
||
else:
|
||
if total_items > 0:
|
||
logging.info(f"📅 {check_date}: 所有字段完整(总计 {total_items} 条记录)")
|
||
# 显示完整性统计
|
||
for field, stats in field_stats.items():
|
||
logging.info(f" - {field}: {stats['completion_rate']:.1f}% 完整")
|
||
else:
|
||
logging.info(f"📅 {check_date}: 无数据")
|
||
|
||
if total_missing > 0:
|
||
logging.info(f"🔍 当天同步完成:发现 {total_missing} 条缺失记录,成功同步 {total_synced} 条")
|
||
print(f"🔍 当天字段同步:发现 {total_missing} 条缺失,同步 {total_synced} 条")
|
||
else:
|
||
# 当天没有缺失数据时,不输出日志(静默模式)
|
||
pass
|
||
|
||
except Exception as e:
|
||
logging.error(f"💥 检查缺失字段时发生异常: {e}")
|
||
import traceback
|
||
logging.error(f"详细错误信息: {traceback.format_exc()}")
|
||
|
||
def setup_schedule(self):
|
||
"""设置定时任务"""
|
||
# 每小时的整点执行抖音播放量抓取
|
||
schedule.every().hour.at(":00").do(self.run_douyin_scraper)
|
||
|
||
# 每1分钟检查一次缺失字段并尝试同步(实时同步)
|
||
schedule.every(1).minutes.do(self.check_and_sync_missing_fields)
|
||
|
||
logging.info(f"⏰ 定时器已设置:每小时整点执行抖音播放量抓取")
|
||
logging.info(f"⏰ 定时器已设置:每1分钟检查缺失字段并同步(实时模式)")
|
||
|
||
def show_next_run(self):
|
||
"""显示下次执行时间"""
|
||
jobs = schedule.get_jobs()
|
||
if jobs:
|
||
next_run = jobs[0].next_run
|
||
current_time = datetime.now()
|
||
wait_seconds = (next_run - current_time).total_seconds()
|
||
wait_minutes = int(wait_seconds // 60)
|
||
wait_hours = int(wait_minutes // 60)
|
||
remaining_minutes = wait_minutes % 60
|
||
|
||
print(f"💡 定时器运行中,下次执行:{next_run.strftime('%Y-%m-%d %H:%M:%S')} (还有{wait_hours}h{remaining_minutes}m)")
|
||
print(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)")
|
||
|
||
logging.info(f"💡 定时器运行中,下次执行:{next_run.strftime('%Y-%m-%d %H:%M:%S')} (还有{wait_hours}h{remaining_minutes}m)")
|
||
logging.info(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)")
|
||
|
||
def run_once(self):
|
||
"""立即执行一次"""
|
||
logging.info("🔧 立即执行模式...")
|
||
self.run_douyin_scraper()
|
||
|
||
def run_test(self):
|
||
"""测试模式 - 立即执行一次"""
|
||
logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...")
|
||
self.run_douyin_scraper()
|
||
|
||
def run_ranking_only(self):
|
||
"""仅生成榜单(不抓取数据)"""
|
||
logging.info("📊 仅生成榜单模式...")
|
||
self.generate_daily_rankings()
|
||
|
||
def start_scheduler(self):
|
||
"""启动定时器"""
|
||
self.is_running = True
|
||
last_status_time = int(time.time()) # 设置为当前时间,1分钟后开始显示状态
|
||
|
||
print("🚀 抖音播放量自动抓取定时器已启动")
|
||
print("⏰ 执行时间:每小时整点执行抖音播放量抓取")
|
||
print("⏹️ 按 Ctrl+C 停止定时器")
|
||
|
||
logging.info("🚀 抖音播放量自动抓取定时器已启动")
|
||
logging.info(f"⏰ 执行时间:每小时整点执行抖音播放量抓取")
|
||
logging.info("⏹️ 按 Ctrl+C 停止定时器")
|
||
|
||
# 启动时显示一次下次执行时间
|
||
self.show_next_run()
|
||
|
||
try:
|
||
while self.is_running:
|
||
schedule.run_pending()
|
||
time.sleep(1)
|
||
|
||
# 每1分钟显示一次状态
|
||
current_time = int(time.time())
|
||
if current_time - last_status_time >= 60: # 60秒 = 1分钟
|
||
self.show_next_run()
|
||
last_status_time = current_time
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⏹️ 定时器已停止")
|
||
logging.info("\n⏹️ 定时器已停止")
|
||
self.is_running = False
|
||
|
||
def main():
|
||
"""主函数"""
|
||
try:
|
||
parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器')
|
||
parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次')
|
||
parser.add_argument('--once', action='store_true', help='立即执行一次并退出')
|
||
parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 设置日志配置 - 只在定时器模式下启用静默模式
|
||
quiet_mode = not (args.test or args.once or args.ranking_only)
|
||
setup_logging(quiet_mode=quiet_mode)
|
||
|
||
print("正在初始化定时器...")
|
||
scheduler = DouyinAutoScheduler()
|
||
|
||
if args.test:
|
||
scheduler._is_timer_mode = False
|
||
print("执行测试模式...")
|
||
scheduler.run_test()
|
||
elif args.once:
|
||
scheduler._is_timer_mode = False
|
||
print("执行单次模式...")
|
||
scheduler.run_once()
|
||
elif args.ranking_only:
|
||
scheduler._is_timer_mode = False
|
||
print("执行榜单生成模式...")
|
||
scheduler.run_ranking_only()
|
||
else:
|
||
scheduler._is_timer_mode = True
|
||
print("启动定时器模式...")
|
||
|
||
# 显示定时器信息(使用print确保能看到)
|
||
from datetime import datetime
|
||
current_time = datetime.now()
|
||
print(f"🕐 当前时间:{current_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"⏰ 执行规则:每小时整点执行抖音播放量抓取")
|
||
|
||
# 计算下次执行时间
|
||
next_hour = current_time.replace(minute=0, second=0, microsecond=0)
|
||
if current_time.minute > 0 or current_time.second > 0:
|
||
next_hour = next_hour.replace(hour=next_hour.hour + 1)
|
||
if next_hour.hour >= 24:
|
||
from datetime import timedelta
|
||
next_hour = next_hour.replace(hour=0) + timedelta(days=1)
|
||
|
||
wait_seconds = (next_hour - current_time).total_seconds()
|
||
wait_minutes = int(wait_seconds // 60)
|
||
|
||
print(f"⏰ 下次执行时间:{next_hour.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f"⏳ 距离下次执行:{wait_minutes} 分钟 ({int(wait_seconds)} 秒)")
|
||
print("💡 定时器正在等待中,将在整点自动执行任务...")
|
||
print("⏹️ 按 Ctrl+C 停止定时器")
|
||
|
||
scheduler.setup_schedule()
|
||
scheduler.start_scheduler()
|
||
|
||
print("程序执行完成")
|
||
|
||
except Exception as e:
|
||
print(f"程序执行出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
return 0
|
||
|
||
if __name__ == '__main__':
|
||
main() |