Compare commits

..

2 Commits

Author SHA1 Message Date
9295e77cf1 主代码可以实时更新
定时器由于要进行播放量插值计算,所以要有固定的时间戳,还是统一保存。
2025-10-27 18:55:54 +08:00
e8baaa4ce9 优化定时器功能 2025-10-27 13:38:16 +08:00
8 changed files with 344 additions and 13719 deletions

View File

@ -32,6 +32,12 @@ from handlers.Rankings.rank_data_scraper import DouyinPlayVVScraper
# 配置日志的函数
def setup_timer_environment():
"""设置定时器相关的环境变量"""
config.apply_timer_environment()
for key, value in config.TIMER_ENV_CONFIG.items():
logging.info(f"设置环境变量: {key}={value}")
def setup_logging(quiet_mode=False):
"""设置日志配置"""
# 确保logs目录存在
@ -101,8 +107,8 @@ class DouyinAutoScheduler:
try:
logging.warning("🚀 开始执行抖音播放量抓取任务...")
# 设置环境变量,确保自动模式
os.environ['AUTO_CONTINUE'] = '1'
# 设置环境变量,确保定时器模式和自动模式
setup_timer_environment()
# 直接创建并运行 DouyinPlayVVScraper 实例
scraper = DouyinPlayVVScraper(

View File

@ -18,6 +18,19 @@ LOG_DIR = 'logs'
# 定时器配置
SCHEDULER_TIME = "24:00" # 定时器执行时间,格式为 HH:MM (24小时制)
# 定时器环境变量配置
TIMER_ENV_CONFIG = {
'TIMER_MODE': '1', # 启用定时器模式,使数据保存到 Ranking_storage_list 集合
'AUTO_CONTINUE': '1' # 启用自动模式,跳过详细数据获取以提高性能
}
# 自动模式跳过函数配置
AUTO_CONTINUE_SKIP_FUNCTIONS = [
'get_collection_video_details', # 跳过合集视频详细数据获取
'scroll_comments', # 跳过评论滚动
# 可以在这里添加更多需要跳过的函数名
]
# TOS/火山云对象存储配置
TOS_CONFIG = {
'access_key_id': os.getenv('TOS_ACCESS_KEY_ID', 'AKLTYjQyYmE1ZDAwZTY5NGZiOWI3ODZkZDhhOWE4MzVjODE'),
@ -39,4 +52,13 @@ API_CONFIG = {
'OSS_HOST': TOS_CONFIG['self_domain']
}
def apply_timer_environment():
"""应用定时器环境变量配置"""
for key, value in TIMER_ENV_CONFIG.items():
os.environ[key] = value
def get_skip_functions():
"""获取自动模式下需要跳过的函数列表"""
return AUTO_CONTINUE_SKIP_FUNCTIONS.copy()
print(f"Successfully loaded configuration for environment: {APP_ENV}")

View File

@ -11,9 +11,13 @@
{
"video_id": "7539690162612079872",
"episode_num": 0
},
{
"video_id": "7565426543275609378",
"episode_num": 0
}
],
"total_count": 3,
"last_update": "2025-10-22T09:55:17.087205",
"total_count": 4,
"last_update": "2025-10-27T10:05:06.655628",
"mix_name": "《小宝穿越|课本古诗文》"
}

View File

@ -51,9 +51,13 @@
{
"video_id": "7564982296051338534",
"episode_num": 0
},
{
"video_id": "7565346285362548019",
"episode_num": 0
}
],
"total_count": 13,
"last_update": "2025-10-25T12:53:08.640840",
"total_count": 14,
"last_update": "2025-10-27T11:04:23.469116",
"mix_name": "暗黑神话《葫芦兄弟》大电影"
}

View File

@ -31,7 +31,7 @@ import psutil
import random
import threading
import argparse
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor # 使用线程池实现异步滑动和监控
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
@ -48,7 +48,7 @@ sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
backend_dir = os.path.join(os.path.dirname(__file__), '..', '..')
sys.path.insert(0, backend_dir)
from database import db
from tos_client import oss_client
from handlers.Rankings.tos_client import oss_client
# 配置日志
@ -713,7 +713,7 @@ class DouyinPlayVVScraper:
except ValueError:
pass # 忽略无法转换为整数的情况
self.play_vv_items.append({
item_data = {
'play_vv': vv,
'formatted': self.format_count(vv),
'url': source_url,
@ -727,7 +727,9 @@ class DouyinPlayVVScraper:
'desc': desc, # 合集描述
'updated_to_episode': updated_to_episode, # 合集总集数
'timestamp': datetime.now().isoformat()
})
}
self.play_vv_items.append(item_data)
logging.info(f'提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量')
if series_author:
logging.info(f' 作者: {series_author}')
@ -736,6 +738,14 @@ class DouyinPlayVVScraper:
if updated_to_episode > 0:
logging.info(f' 总集数: {updated_to_episode}')
# 只在非定时器模式下使用实时保存
is_timer_mode = os.environ.get('TIMER_MODE') == '1'
if not is_timer_mode:
logging.info(f'立即保存合集数据: {mix_name}')
self.save_single_item_to_mongodb(item_data)
else:
logging.info(f'定时器模式:暂存合集数据: {mix_name},将在最后批量保存')
# 递归搜索子对象
for key, value in obj.items():
if isinstance(value, (dict, list)):
@ -766,7 +776,7 @@ class DouyinPlayVVScraper:
if episodes > 0:
logging.info(f"从statis.updated_to_episode提取到集数: {episodes}")
self.play_vv_items.append({
item_data = {
'play_vv': vv,
'formatted': self.format_count(vv),
'url': source_url,
@ -776,8 +786,18 @@ class DouyinPlayVVScraper:
'mix_id': mix_id, # 合集ID
'updated_to_episode': episodes if episodes > 0 else None, # 从statis.updated_to_episode提取的集数
'timestamp': datetime.now().isoformat()
})
}
self.play_vv_items.append(item_data)
logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量')
# 只在非定时器模式下使用实时保存
is_timer_mode = os.environ.get('TIMER_MODE') == '1'
if not is_timer_mode:
logging.info(f'立即保存正则提取的合集数据: {mix_name}')
self.save_single_item_to_mongodb(item_data)
else:
logging.info(f'定时器模式:暂存正则提取的合集数据: {mix_name},将在最后批量保存')
except Exception:
continue
@ -787,7 +807,7 @@ class DouyinPlayVVScraper:
vv = int(match)
# 检查是否已经存在相同的play_vv
if not any(item['play_vv'] == vv for item in self.play_vv_items):
self.play_vv_items.append({
item_data = {
'play_vv': vv,
'formatted': self.format_count(vv),
'url': source_url,
@ -797,7 +817,18 @@ class DouyinPlayVVScraper:
'mix_id': '', # 未知mix_id
'updated_to_episode': None, # 未知集数
'timestamp': datetime.now().isoformat()
})
}
self.play_vv_items.append(item_data)
logging.info(f'兜底提取到播放量: {vv:,}')
# 只在非定时器模式下使用实时保存
is_timer_mode = os.environ.get('TIMER_MODE') == '1'
if not is_timer_mode:
logging.info(f'立即保存兜底提取的数据: {vv:,} 播放量')
self.save_single_item_to_mongodb(item_data)
else:
logging.info(f'定时器模式:暂存兜底提取的数据: {vv:,} 播放量,将在最后批量保存')
except Exception:
continue
@ -851,7 +882,6 @@ class DouyinPlayVVScraper:
# 可能是base64编码
if body_obj.get('base64Encoded'):
try:
import base64
body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore')
except Exception:
pass
@ -964,7 +994,7 @@ class DouyinPlayVVScraper:
def upload_cover_image(self, cover_url, mix_name):
"""
上传封面图片到TOS并返回永久链接带去重功能
上传封面图片到TOS并返回永久链接带去重功能和重试机制
Args:
cover_url: 临时封面图片链接
@ -976,7 +1006,6 @@ class DouyinPlayVVScraper:
if not cover_url:
return cover_url
try:
# 提取图片ID
image_id = self.extract_douyin_image_id(cover_url)
@ -1005,16 +1034,37 @@ class DouyinPlayVVScraper:
random_filename = f"{uuid.uuid4().hex}{file_extension}"
object_key = f"media/rank/{random_filename}"
logging.info(f'开始上传封面图片: {mix_name}')
# 重试机制最多尝试3次
max_retries = 3
last_error = None
for attempt in range(max_retries):
try:
logging.info(f'开始上传封面图片 (尝试 {attempt + 1}/{max_retries}): {mix_name}')
logging.info(f'封面图片URL: {cover_url}')
logging.info(f'目标对象键: {object_key}')
# 从URL上传到TOS并获取新的URL
oss_url = oss_client.upload_from_url(
url=cover_url,
object_key=object_key,
return_url=True
return_url=True,
timeout=30 # 30秒超时
)
# 验证上传是否成功检查返回的URL是否包含预期的域名
if not oss_url or not isinstance(oss_url, str):
raise Exception(f"上传返回了无效的URL: {oss_url}")
# 检查URL格式是否正确
expected_domain = oss_client.self_domain
if expected_domain not in oss_url:
raise Exception(f"上传返回的URL域名不正确: {oss_url}, 期望包含: {expected_domain}")
# 检查URL是否包含正确的对象键
if object_key not in oss_url:
raise Exception(f"上传返回的URL不包含对象键: {oss_url}, 期望包含: {object_key}")
logging.info(f'封面图片上传成功: {mix_name} -> {oss_url}')
# 如果有图片ID将结果缓存
@ -1025,11 +1075,32 @@ class DouyinPlayVVScraper:
return oss_url
except Exception as e:
logging.error(f'封面图片上传失败: {mix_name} - {str(e)}')
last_error = e
error_msg = str(e)
logging.warning(f'封面图片上传失败 (尝试 {attempt + 1}/{max_retries}): {mix_name} - {error_msg}')
# 如果不是最后一次尝试,等待一段时间后重试
if attempt < max_retries - 1:
import time
wait_time = (attempt + 1) * 2 # 递增等待时间2秒、4秒、6秒
logging.info(f'等待 {wait_time} 秒后重试...')
time.sleep(wait_time)
# 为重试生成新的文件名,避免可能的冲突
random_filename = f"{uuid.uuid4().hex}{file_extension}"
object_key = f"media/rank/{random_filename}"
# 所有重试都失败了
logging.error(f'封面图片上传彻底失败 (已尝试 {max_retries} 次): {mix_name} - 最后错误: {last_error}')
logging.error(f'将使用原始链接作为回退: {cover_url}')
return cover_url # 上传失败时返回原链接
def save_to_mongodb(self):
"""将数据保存到MongoDB"""
"""
将数据批量保存到MongoDB
注意此方法现在作为备用保留正常流程使用实时保存功能(save_single_item_to_mongodb)
避免重复保存数据
"""
if self.collection is None:
logging.warning('MongoDB未连接跳过数据库保存')
return
@ -1049,16 +1120,33 @@ class DouyinPlayVVScraper:
# 处理封面图片
permanent_cover_url = ''
upload_success = False
if original_cover_url:
# 上传封面图片到TOS获取永久链接
permanent_cover_url = self.upload_cover_image(original_cover_url, mix_name)
# 如果上传失败且有原始链接,记录警告但继续保存
if permanent_cover_url == original_cover_url:
logging.warning(f'封面图片上传失败,使用原始链接: {mix_name}')
# 检查上传是否成功
if permanent_cover_url != original_cover_url:
# 上传成功URL已经改变
upload_success = True
logging.info(f'封面图片上传成功,已获得永久链接: {mix_name}')
else:
# 上传失败,使用原始链接作为回退
upload_success = False
logging.warning(f'封面图片上传失败,回退使用原始链接: {mix_name}')
logging.warning(f'原始链接: {original_cover_url}')
# 可以在这里添加额外的回退策略,比如:
# 1. 尝试使用备用的图片链接
# 2. 设置一个默认的占位图片
# 3. 记录失败的链接以便后续重试
# 当前策略:保持原始链接,但在数据库中标记上传状态
else:
# 没有封面图片,使用空字符串
permanent_cover_url = ''
upload_success = True # 没有图片不算失败
# 获取合集中的所有视频ID
mix_id = item.get('mix_id', '')
@ -1158,6 +1246,7 @@ class DouyinPlayVVScraper:
'rank': 0, # 临时设置,后面会重新计算
'cover_image_url_original': original_cover_url, # 保存原始临时链接用于调试
'cover_image_url': permanent_cover_url, # 合集封面图片永久链接
'cover_upload_success': upload_success, # 封面图片上传是否成功
'cover_backup_urls': item.get('cover_backup_urls', []), # 封面图片备用链接列表
# 新增的字段
'series_author': item.get('series_author', ''), # 合集作者/影视工作室
@ -1182,20 +1271,136 @@ class DouyinPlayVVScraper:
max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0
logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}')
logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url_original, cover_image_url, series_author, desc, updated_to_episode')
logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url_original, cover_image_url, cover_upload_success, series_author, desc, updated_to_episode')
# 统计封面图片处理情况
cover_count = sum(1 for doc in documents if doc.get('cover_image_url'))
original_count = sum(1 for item in self.play_vv_items if item.get('cover_image_url'))
success_count = sum(1 for doc in documents if doc.get('cover_image_url') and doc.get('cover_image_url') != doc.get('cover_image_url_original', ''))
upload_success_count = sum(1 for doc in documents if doc.get('cover_upload_success', False))
upload_failed_count = sum(1 for doc in documents if doc.get('cover_image_url_original') and not doc.get('cover_upload_success', False))
logging.info(f'封面图片统计: {cover_count}/{len(documents)} 个合集有封面链接')
logging.info(f'封面上传统计: {success_count}/{original_count} 个封面成功上传到TOS')
logging.info(f'封面上传统计: {upload_success_count}/{original_count} 个封面成功上传到TOS')
if upload_failed_count > 0:
logging.warning(f'封面上传失败: {upload_failed_count} 个封面上传失败,使用原始链接')
logging.info(f'图片缓存统计: 当前缓存 {len(self.image_cache)} 个图片映射')
except Exception as e:
logging.error(f'保存到MongoDB时出错: {e}')
def save_single_item_to_mongodb(self, item: dict):
"""将单条数据立即保存到MongoDB
Args:
item: 包含合集信息的字典
"""
if self.collection is None:
logging.warning('MongoDB未连接跳过单条数据保存')
return
try:
batch_time = datetime.now()
# 获取原始封面图片URL
original_cover_url = item.get('cover_image_url', '')
mix_name = item.get('mix_name', '')
mix_id = item.get('mix_id', '')
# 处理封面图片
permanent_cover_url = ''
upload_success = False
if original_cover_url:
# 上传封面图片到TOS获取永久链接
permanent_cover_url = self.upload_cover_image(original_cover_url, mix_name)
# 检查上传是否成功
if permanent_cover_url != original_cover_url:
upload_success = True
logging.info(f'封面图片上传成功: {mix_name}')
else:
upload_success = False
logging.warning(f'封面图片上传失败,使用原始链接: {mix_name}')
else:
permanent_cover_url = ''
upload_success = True # 没有图片不算失败
# 获取合集中的所有视频ID定时器模式时不获取详细互动数据
episode_video_ids = []
episode_details = []
if mix_id:
logging.info(f'获取合集 {mix_name} 的视频ID')
current_episode_count = item.get('updated_to_episode', 0)
episode_video_ids = self.get_collection_videos(
mix_id=mix_id,
mix_name=mix_name,
current_episode_count=current_episode_count
)
# 构建每集信息(定时器模式时不获取详细互动数据以提高速度)
total_episodes = item.get('updated_to_episode', 0)
for i in range(total_episodes):
episode_number = i + 1
video_id = episode_video_ids[i] if i < len(episode_video_ids) else ''
episode_info = {
'episode_number': episode_number,
'video_id': video_id,
'likes': 0, # 定时器模式时不获取详细数据
'shares': 0,
'favorites': 0,
'likes_formatted': '0',
'shares_formatted': '0',
'favorites_formatted': '0',
'comments': []
}
episode_details.append(episode_info)
# 计算当前排名(基于当前批次的数据)
higher_count = self.collection.count_documents({
'play_vv': {'$gt': item.get('play_vv', 0)},
'batch_time': {'$gte': batch_time.replace(hour=0, minute=0, second=0, microsecond=0)}
})
current_rank = higher_count + 1
# 构建文档 - 每次都插入新记录,保留历史数据
doc = {
'batch_time': batch_time,
'mix_name': mix_name,
'video_url': item.get('video_url', ''),
'playcount': item.get('formatted', ''),
'play_vv': item.get('play_vv', 0),
'request_id': item.get('request_id', ''),
'rank': current_rank,
'cover_image_url_original': original_cover_url,
'cover_image_url': permanent_cover_url,
'cover_upload_success': upload_success,
'cover_backup_urls': item.get('cover_backup_urls', []),
'series_author': item.get('series_author', ''),
'desc': item.get('desc', ''),
'updated_to_episode': item.get('updated_to_episode', 0),
'episode_video_ids': episode_video_ids,
'episode_details': episode_details,
'created_at': datetime.now()
}
# 插入新记录 - 始终插入,不更新已存在的记录
result = self.collection.insert_one(doc)
logging.info(f'边抓取边保存新记录: {mix_name} - {item.get("play_vv", 0):,} 播放量 (排名: {current_rank})')
# 更新其他记录的排名
self.collection.update_many(
{
'play_vv': {'$lt': item.get('play_vv', 0)},
'batch_time': {'$gte': batch_time.replace(hour=0, minute=0, second=0, microsecond=0)},
'_id': {'$ne': result.inserted_id}
},
{'$inc': {'rank': 1}}
)
except Exception as e:
logging.error(f'实时保存单条数据到MongoDB时出错: {e}')
def get_video_info(self, video_id: str) -> dict:
"""获取视频详细信息
Args:
@ -1432,6 +1637,11 @@ class DouyinPlayVVScraper:
Returns:
list: 收集到的所有评论数据
"""
# 检查AUTO_CONTINUE环境变量如果设置为'1'则跳过评论滑动
if os.environ.get('AUTO_CONTINUE') == '1' or self.auto_continue:
logging.info(f'🚀 AUTO_CONTINUE模式跳过视频 {video_id} 的评论滑动加载')
return []
all_comments = []
collected_comment_ids = set()
@ -1447,10 +1657,6 @@ class DouyinPlayVVScraper:
# 点击评论区域以触发网络请求
self._click_comment_area()
# 使用线程池实现异步滑动和监控
from concurrent.futures import ThreadPoolExecutor
import threading
# 创建共享状态对象,用于任务间通信
shared_state = {
'scroll_completed': False,
@ -2107,6 +2313,15 @@ class DouyinPlayVVScraper:
'error': None
}
# 检查AUTO_CONTINUE环境变量如果设置为'1'则跳过详细数据获取
if os.environ.get('AUTO_CONTINUE') == '1' or self.auto_continue:
logging.info(f'🚀 AUTO_CONTINUE模式跳过视频 {video_id} 的详细数据获取(点赞、收藏、分享、评论)')
video_details['success'] = True
video_details['error'] = 'AUTO_CONTINUE模式跳过详细数据获取'
return video_details
logging.info(f'🔍 get_video_details 被调用: video_id={video_id}, max_comments={max_comments}')
try:
# 确保driver已初始化
if self.driver is None:
@ -2424,9 +2639,9 @@ class DouyinPlayVVScraper:
Returns:
list: 包含每个视频详细数据的列表
"""
# 定时器模式下跳过此函数
if os.environ.get('TIMER_MODE') == '1':
logging.info(f'定时器模式:跳过 get_collection_video_details 函数')
# AUTO_CONTINUE模式下跳过此函数
if os.environ.get('AUTO_CONTINUE') == '1' or self.auto_continue:
logging.info(f'🚀 AUTO_CONTINUE模式:跳过 get_collection_video_details 函数(合集视频详细数据获取)')
return []
if not episode_video_ids:
@ -2502,8 +2717,17 @@ class DouyinPlayVVScraper:
self.collect_network_bodies()
self.parse_ssr_data()
self.dedupe()
# 根据模式选择保存方式
is_timer_mode = os.environ.get('TIMER_MODE') == '1'
if is_timer_mode:
# 定时器模式使用批量保存所有数据使用相同的batch_time
self.save_results()
logging.info('完成play_vv数量: %d', len(self.play_vv_items))
logging.info('定时器模式完成批量保存play_vv数量: %d', len(self.play_vv_items))
else:
# 普通模式:数据已通过实时保存功能保存
logging.info('普通模式完成play_vv数量: %d', len(self.play_vv_items))
logging.info('所有数据已通过实时保存功能保存到数据库')
finally:
if self.driver:
try: