diff --git a/backend/Timer_worker.py b/backend/Timer_worker.py index b33ceb7..55e9c8e 100644 --- a/backend/Timer_worker.py +++ b/backend/Timer_worker.py @@ -179,24 +179,44 @@ class DouyinAutoScheduler: logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)") - # 获取昨天的榜单数据(如果存在),取最新的计算结果 - yesterday_ranking = rankings_collection.find_one({ - "date": yesterday_str, - "type": "comprehensive" - }, sort=[("calculation_sequence", -1)]) + # 从Ranking_storage_list中获取昨天最后一次抓取的数据 + yesterday_start = datetime.combine(yesterday, datetime.min.time()) + yesterday_end = datetime.combine(yesterday, datetime.max.time()) + + # 获取昨天的最后一次抓取数据(按batch_time排序取最新的) + yesterday_latest_batch = douyin_collection.find_one({ + "batch_time": { + "$gte": yesterday_start, + "$lte": yesterday_end + } + }, sort=[("batch_time", -1)]) yesterday_data = {} - if yesterday_ranking and "data" in yesterday_ranking: + if yesterday_latest_batch: + yesterday_batch_time = yesterday_latest_batch.get("batch_time") + logging.info(f"📊 找到昨天最后一次抓取时间: {yesterday_batch_time}") + + # 获取昨天最后一次抓取的所有数据 + yesterday_videos_raw = list(douyin_collection.find({ + "batch_time": yesterday_batch_time + }).sort("play_vv", -1)) + + # 按短剧名称去重,每个短剧只保留播放量最高的一条 + unique_yesterday_videos = {} + for video in yesterday_videos_raw: + mix_name = video.get("mix_name", "") + if mix_name and (mix_name not in unique_yesterday_videos or video.get("play_vv", 0) > unique_yesterday_videos[mix_name].get("play_vv", 0)): + unique_yesterday_videos[mix_name] = video + # 将昨天的数据转换为字典,以短剧名称为键 - for item in yesterday_ranking["data"]: - title = item.get("title", "") - if title: - yesterday_data[title] = { - "rank": item.get("rank", 0), - "play_vv": item.get("play_vv", 0), - "video_id": item.get("video_id", "") - } - logging.info(f"📊 找到昨天的榜单数据,共 {len(yesterday_data)} 个短剧") + for mix_name, video in unique_yesterday_videos.items(): + yesterday_data[mix_name] = { + "rank": 0, # 原始数据没有排名,设为0 + "play_vv": video.get("play_vv", 0), + "video_id": str(video.get("_id", "")) + } + + logging.info(f"📊 找到昨天的原始数据,共 {len(yesterday_data)} 个短剧(原始数据:{len(yesterday_videos_raw)} 条)") else: logging.info("📊 未找到昨天的原始数据,将作为首次生成") diff --git a/backend/config.py b/backend/config.py index 96aed7a..ee501e4 100644 --- a/backend/config.py +++ b/backend/config.py @@ -3,9 +3,9 @@ import importlib # 数据库配置 MONGO_URI = "mongodb://localhost:27017" -MONGO_DB_NAME = "kemeng_media" +# MONGO_DB_NAME = "Rankings" # MONGO_URI = "mongodb://mongouser:Jdei2243afN@172.16.0.6:27017,172.16.0.4:27017/test?replicaSet=cmgo-r6qkaern_0&authSource=admin" -# MONGO_DB_NAME = "kemeng_media" +MONGO_DB_NAME = "kemeng_media" # 应用配置 APP_ENV = os.getenv('APP_ENV', 'development') diff --git a/backend/handlers/Rankings/rank_data_scraper.py b/backend/handlers/Rankings/rank_data_scraper.py index 69e1e2f..4e6a289 100644 --- a/backend/handlers/Rankings/rank_data_scraper.py +++ b/backend/handlers/Rankings/rank_data_scraper.py @@ -49,6 +49,7 @@ backend_dir = os.path.join(os.path.dirname(__file__), '..', '..') sys.path.insert(0, backend_dir) from database import db from handlers.Rankings.tos_client import oss_client +import config # 配置日志 @@ -79,6 +80,22 @@ class DouyinPlayVVScraper: self.collection = None self.image_cache = {} # 图片ID到TOS链接的缓存映射 {image_id: tos_url} self.all_collected_comments = [] # 存储所有收集到的评论数据 + + # 实时存储相关属性 + self.batch_id = str(uuid.uuid4()) # 每次运行的唯一标识 + self.batch_time = datetime.now() # 批次时间 + self.item_sequence = 0 # 数据项序号 + self.saved_items = set() # 已保存的数据项(用于去重) + + # 根据运行模式自动选择存储方式 + is_timer_mode = os.environ.get('TIMER_MODE') == '1' + self.realtime_save_enabled = not is_timer_mode # 定时器模式使用批量存储,普通模式使用实时存储 + + if self.realtime_save_enabled: + logging.info(f'[普通模式] 启用实时存储,批次ID: {self.batch_id}') + else: + logging.info('[定时器模式] 使用批量存储') + self._cleanup_old_profiles() self._setup_mongodb() self._load_image_cache() @@ -89,9 +106,6 @@ class DouyinPlayVVScraper: # 使用 database.py 中的连接 self.db = db - # 根据运行模式选择集合 - is_timer_mode = os.environ.get('TIMER_MODE') == '1' - mongo_collection = 'Ranking_storage_list' if is_timer_mode else 'Rankings_list' # 根据运行模式选择集合 is_timer_mode = os.environ.get('TIMER_MODE') == '1' mongo_collection = 'Ranking_storage_list' if is_timer_mode else 'Rankings_list' @@ -99,7 +113,6 @@ class DouyinPlayVVScraper: logging.info(f'MongoDB连接成功,使用数据库: {self.db.name},集合: {mongo_collection}') logging.info(f'当前运行模式: {"定时器模式" if is_timer_mode else "普通模式"}') - logging.info(f'当前运行模式: {"定时器模式" if is_timer_mode else "普通模式"}') except Exception as e: logging.error(f'MongoDB连接失败: {e}') @@ -335,8 +348,15 @@ class DouyinPlayVVScraper: logging.info("请在弹出的浏览器中手动完成登录。") if self.auto_continue: - logging.info('自动继续模式,跳过手动等待...') - time.sleep(5) + logging.info('自动继续模式,假设已登录并跳过手动等待...') + # 在自动模式下,尝试导航到起始URL + try: + logging.info(f"自动模式:导航到起始URL: {self.start_url}") + self.driver.get(self.start_url) + time.sleep(3) # 等待页面加载 + logging.info("自动模式:假设登录成功,继续执行...") + except Exception as e: + logging.warning(f"自动模式导航失败: {e},继续执行...") return logging.info("进入手动登录确认循环...") @@ -393,50 +413,73 @@ class DouyinPlayVVScraper: logging.warning(f"检查登录状态时出错: {e}") return False - def _detect_login_status(self, timeout: int = 600) -> bool: + def _detect_login_status(self, timeout: int = 30) -> bool: """自动检测是否已登录""" try: start = time.time() + attempt = 0 while time.time() - start < timeout: + attempt += 1 + logging.info(f"登录检测尝试 {attempt}...") time.sleep(2) + # 检查登录状态的多个选择器 selectors = [ '[data-e2e="user-avatar"]', '.user-avatar', '[class*="avatar"]', - '[class*="Avatar"]' + '[class*="Avatar"]', + '.avatar', + 'img[alt*="头像"]', + 'img[alt*="avatar"]' ] for selector in selectors: try: elements = self.driver.find_elements("css selector", selector) if elements: - logging.info("检测到用户头像,确认已登录") + logging.info(f"检测到用户头像 (选择器: {selector}),确认已登录") return True - except Exception: + except Exception as e: + logging.debug(f"选择器 {selector} 检测失败: {e}") continue # 检查是否有登录按钮(表示未登录) login_selectors = [ '[data-e2e="login-button"]', 'button[class*="login"]', - 'a[href*="login"]' + 'a[href*="login"]', + '.login-button', + 'button:contains("登录")' ] for selector in login_selectors: try: elements = self.driver.find_elements("css selector", selector) if elements: - logging.info("检测到登录按钮,用户未登录") + logging.info(f"检测到登录按钮 (选择器: {selector}),用户未登录") return False - except Exception: + except Exception as e: + logging.debug(f"登录按钮选择器 {selector} 检测失败: {e}") continue + + # 添加页面源码检查 + try: + page_source = self.driver.page_source + if "登录" in page_source and "头像" not in page_source: + logging.info("页面源码显示需要登录") + return False + elif any(keyword in page_source for keyword in ["我的", "收藏", "合集"]): + logging.info("页面源码显示已登录") + return True + except Exception as e: + logging.debug(f"页面源码检查失败: {e}") - logging.info("登录状态检测超时,假设未登录") - return False + logging.warning(f"登录状态检测超时 ({timeout}秒),假设已登录并继续") + return True # 改为假设已登录,避免卡住 except Exception as e: - logging.warning(f"登录状态检测出错: {e}") - return False + logging.warning(f"登录状态检测出错: {e},假设已登录并继续") + return True # 改为假设已登录,避免卡住 def trigger_loading(self): logging.info('触发数据加载:滚动 + 刷新') @@ -713,6 +756,7 @@ class DouyinPlayVVScraper: except ValueError: pass # 忽略无法转换为整数的情况 + # 构建合集数据 item_data = { 'play_vv': vv, 'formatted': self.format_count(vv), @@ -729,7 +773,13 @@ class DouyinPlayVVScraper: 'timestamp': datetime.now().isoformat() } + # 添加到列表(保持原有逻辑) self.play_vv_items.append(item_data) + + # 实时保存到数据库 + if self.realtime_save_enabled: + self.save_single_item_realtime(item_data) + logging.info(f'提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量') if series_author: logging.info(f' 作者: {series_author}') @@ -737,14 +787,6 @@ class DouyinPlayVVScraper: logging.info(f' 描述: {desc[:100]}{"..." if len(desc) > 100 else ""}') if updated_to_episode > 0: logging.info(f' 总集数: {updated_to_episode}') - - # 只在非定时器模式下使用实时保存 - is_timer_mode = os.environ.get('TIMER_MODE') == '1' - if not is_timer_mode: - logging.info(f'立即保存合集数据: {mix_name}') - self.save_single_item_to_mongodb(item_data) - else: - logging.info(f'定时器模式:暂存合集数据: {mix_name},将在最后批量保存') # 递归搜索子对象 for key, value in obj.items(): @@ -776,6 +818,7 @@ class DouyinPlayVVScraper: if episodes > 0: logging.info(f"从statis.updated_to_episode提取到集数: {episodes}") + # 构建合集数据 item_data = { 'play_vv': vv, 'formatted': self.format_count(vv), @@ -788,16 +831,14 @@ class DouyinPlayVVScraper: 'timestamp': datetime.now().isoformat() } + # 添加到列表(保持原有逻辑) self.play_vv_items.append(item_data) - logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量') - # 只在非定时器模式下使用实时保存 - is_timer_mode = os.environ.get('TIMER_MODE') == '1' - if not is_timer_mode: - logging.info(f'立即保存正则提取的合集数据: {mix_name}') - self.save_single_item_to_mongodb(item_data) - else: - logging.info(f'定时器模式:暂存正则提取的合集数据: {mix_name},将在最后批量保存') + # 实时保存到数据库 + if self.realtime_save_enabled: + self.save_single_item_realtime(item_data) + + logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量') except Exception: continue @@ -807,6 +848,7 @@ class DouyinPlayVVScraper: vv = int(match) # 检查是否已经存在相同的play_vv if not any(item['play_vv'] == vv for item in self.play_vv_items): + # 构建合集数据 item_data = { 'play_vv': vv, 'formatted': self.format_count(vv), @@ -819,16 +861,12 @@ class DouyinPlayVVScraper: 'timestamp': datetime.now().isoformat() } + # 添加到列表(保持原有逻辑) self.play_vv_items.append(item_data) - logging.info(f'兜底提取到播放量: {vv:,}') - # 只在非定时器模式下使用实时保存 - is_timer_mode = os.environ.get('TIMER_MODE') == '1' - if not is_timer_mode: - logging.info(f'立即保存兜底提取的数据: {vv:,} 播放量') - self.save_single_item_to_mongodb(item_data) - else: - logging.info(f'定时器模式:暂存兜底提取的数据: {vv:,} 播放量,将在最后批量保存') + # 实时保存到数据库(对于未知合集,可能不需要实时保存,但为了一致性还是保存) + if self.realtime_save_enabled: + self.save_single_item_realtime(item_data) except Exception: continue @@ -924,7 +962,8 @@ class DouyinPlayVVScraper: vv = int(m) # 检查是否已经存在相同的play_vv if not any(item['play_vv'] == vv for item in self.play_vv_items): - self.play_vv_items.append({ + # 构建合集数据 + item_data = { 'play_vv': vv, 'formatted': self.format_count(vv), 'url': 'page_source_statis', @@ -932,7 +971,14 @@ class DouyinPlayVVScraper: 'mix_name': '', # 从statis中无法获取合集名称 'video_url': '', # 从statis中无法获取链接 'timestamp': datetime.now().isoformat() - }) + } + + # 添加到列表(保持原有逻辑) + self.play_vv_items.append(item_data) + + # 实时保存到数据库 + if self.realtime_save_enabled: + self.save_single_item_realtime(item_data) except Exception: pass except Exception: @@ -950,10 +996,56 @@ class DouyinPlayVVScraper: self.play_vv_items = unique def save_results(self): - # 保存到MongoDB - self.save_to_mongodb() + if self.realtime_save_enabled and self.saved_items: + # 实时保存模式:只更新排名和统计信息 + self.update_ranks_for_batch() + logging.info(f'[实时保存] 所有数据已通过实时保存功能保存到数据库,共 {len(self.saved_items)} 个合集') + logging.info(f'[实时保存] 批次ID: {self.batch_id}') + else: + # 传统批量保存模式 + self.save_to_mongodb() + logging.info('结果已保存到MongoDB') - logging.info('结果已保存到MongoDB') + def update_ranks_for_batch(self): + """为当前批次的数据更新排名""" + if self.collection is None or not self.saved_items: + return + + try: + # 获取当前批次的所有数据,按播放量排序 + cursor = self.collection.find( + {'batch_id': self.batch_id}, + {'_id': 1, 'play_vv': 1, 'mix_name': 1} + ).sort('play_vv', -1) + + batch_items = list(cursor) + + if not batch_items: + logging.warning(f'[实时保存] 未找到批次 {self.batch_id} 的数据') + return + + # 批量更新排名 + from pymongo import UpdateOne + bulk_operations = [] + for rank, item in enumerate(batch_items, 1): + bulk_operations.append( + UpdateOne( + {'_id': item['_id']}, + {'$set': {'rank': rank}} + ) + ) + + if bulk_operations: + result = self.collection.bulk_write(bulk_operations) + logging.info(f'[实时保存] 成功更新 {result.modified_count} 个合集的排名') + + # 输出排名统计 + total_play_vv = sum(item['play_vv'] for item in batch_items) + max_play_vv = batch_items[0]['play_vv'] if batch_items else 0 + logging.info(f'[实时保存] 排名统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}') + + except Exception as e: + logging.error(f'[实时保存] 更新排名失败: {e}') def extract_douyin_image_id(self, cover_url): """ @@ -1096,11 +1188,12 @@ class DouyinPlayVVScraper: return cover_url # 上传失败时返回原链接 def save_to_mongodb(self): - """ - 将数据批量保存到MongoDB - 注意:此方法现在作为备用保留,正常流程使用实时保存功能(save_single_item_to_mongodb) - 避免重复保存数据 - """ + """将数据保存到MongoDB""" + # 如果启用了实时保存,跳过批量保存 + if self.realtime_save_enabled: + logging.info('[批量保存] 实时保存模式已启用,跳过批量保存') + return + if self.collection is None: logging.warning('MongoDB未连接,跳过数据库保存') return @@ -1165,8 +1258,7 @@ class DouyinPlayVVScraper: logging.info(f'开始获取合集 {mix_name} 的视频详细互动数据') video_details_list = self.get_collection_video_details( episode_video_ids=episode_video_ids, - mix_name=mix_name, - max_comments_per_video=10 # 每个视频最多获取10条评论 + mix_name=mix_name ) # 构建每集的详细信息,使用获取到的真实数据 @@ -1288,118 +1380,303 @@ class DouyinPlayVVScraper: except Exception as e: logging.error(f'保存到MongoDB时出错: {e}') - def save_single_item_to_mongodb(self, item: dict): - """将单条数据立即保存到MongoDB - Args: - item: 包含合集信息的字典 - """ - if self.collection is None: - logging.warning('MongoDB未连接,跳过单条数据保存') - return + def save_collection_basic_info(self, item_data: dict): + """立即保存合集基础信息到数据库(第一阶段保存)""" + logging.info(f'[立即保存] 保存合集基础信息: {item_data.get("mix_name", "未知")}') + if not self.realtime_save_enabled or self.collection is None: + logging.warning(f'[立即保存] 跳过保存 - 实时保存未启用或数据库未连接') + return None + try: - batch_time = datetime.now() + # 生成唯一标识用于去重 + item_key = f"{item_data.get('mix_id', '')}_{item_data.get('play_vv', 0)}" - # 获取原始封面图片URL - original_cover_url = item.get('cover_image_url', '') - mix_name = item.get('mix_name', '') - mix_id = item.get('mix_id', '') + if item_key in self.saved_items: + logging.warning(f'[立即保存] 合集数据已存在,跳过保存: {item_data.get("mix_name", "")}') + return None - # 处理封面图片 + # 增加序号 + self.item_sequence += 1 + + # 获取基础信息 + mix_name = item_data.get('mix_name', '') + mix_id = item_data.get('mix_id', '') + original_cover_url = item_data.get('cover_image_url', '') + current_episode_count = item_data.get('updated_to_episode', 0) + + # 处理封面图片(如果有的话) permanent_cover_url = '' upload_success = False if original_cover_url: - # 上传封面图片到TOS获取永久链接 permanent_cover_url = self.upload_cover_image(original_cover_url, mix_name) - - # 检查上传是否成功 - if permanent_cover_url != original_cover_url: - upload_success = True - logging.info(f'封面图片上传成功: {mix_name}') - else: - upload_success = False - logging.warning(f'封面图片上传失败,使用原始链接: {mix_name}') + upload_success = permanent_cover_url != original_cover_url + if upload_success: + logging.info(f'[立即保存] 封面图片上传成功: {mix_name}') else: - permanent_cover_url = '' upload_success = True # 没有图片不算失败 - # 获取合集中的所有视频ID(定时器模式时不获取详细互动数据) - episode_video_ids = [] - episode_details = [] + # 创建基础的episode_details结构 + episode_details = [ + { + 'episode_number': i + 1, + 'video_id': '', # 稍后更新 + 'likes': 0, # 稍后更新 + 'shares': 0, # 稍后更新 + 'favorites': 0, # 稍后更新 + 'likes_formatted': '0', + 'shares_formatted': '0', + 'favorites_formatted': '0', + 'comments': [], # 稍后更新 + 'data_status': 'pending' # 标记数据状态 + } for i in range(current_episode_count) + ] - if mix_id: - logging.info(f'获取合集 {mix_name} 的视频ID') - current_episode_count = item.get('updated_to_episode', 0) - episode_video_ids = self.get_collection_videos( - mix_id=mix_id, - mix_name=mix_name, - current_episode_count=current_episode_count - ) - - # 构建每集信息(定时器模式时不获取详细互动数据以提高速度) - total_episodes = item.get('updated_to_episode', 0) - for i in range(total_episodes): - episode_number = i + 1 - video_id = episode_video_ids[i] if i < len(episode_video_ids) else '' - - episode_info = { - 'episode_number': episode_number, - 'video_id': video_id, - 'likes': 0, # 定时器模式时不获取详细数据 - 'shares': 0, - 'favorites': 0, - 'likes_formatted': '0', - 'shares_formatted': '0', - 'favorites_formatted': '0', - 'comments': [] - } - episode_details.append(episode_info) - - # 计算当前排名(基于当前批次的数据) - higher_count = self.collection.count_documents({ - 'play_vv': {'$gt': item.get('play_vv', 0)}, - 'batch_time': {'$gte': batch_time.replace(hour=0, minute=0, second=0, microsecond=0)} - }) - current_rank = higher_count + 1 - - # 构建文档 - 每次都插入新记录,保留历史数据 + # 构建基础文档数据 doc = { - 'batch_time': batch_time, + 'batch_id': self.batch_id, + 'batch_time': self.batch_time, + 'item_sequence': self.item_sequence, 'mix_name': mix_name, - 'video_url': item.get('video_url', ''), - 'playcount': item.get('formatted', ''), - 'play_vv': item.get('play_vv', 0), - 'request_id': item.get('request_id', ''), - 'rank': current_rank, + 'mix_id': mix_id, + 'video_url': item_data.get('video_url', ''), + 'playcount': item_data.get('formatted', ''), + 'play_vv': item_data.get('play_vv', 0), + 'request_id': item_data.get('request_id', ''), + 'rank': 0, 'cover_image_url_original': original_cover_url, 'cover_image_url': permanent_cover_url, 'cover_upload_success': upload_success, - 'cover_backup_urls': item.get('cover_backup_urls', []), - 'series_author': item.get('series_author', ''), - 'desc': item.get('desc', ''), - 'updated_to_episode': item.get('updated_to_episode', 0), - 'episode_video_ids': episode_video_ids, + 'cover_backup_urls': item_data.get('cover_backup_urls', []), + 'series_author': item_data.get('series_author', ''), + 'desc': item_data.get('desc', ''), + 'updated_to_episode': current_episode_count, + 'episode_video_ids': [], # 稍后更新 'episode_details': episode_details, - 'created_at': datetime.now() + 'data_status': 'basic_saved', # 标记为基础信息已保存 + 'realtime_saved': True, + 'created_at': datetime.now(), + 'last_updated': datetime.now() } - # 插入新记录 - 始终插入,不更新已存在的记录 + # 插入文档 result = self.collection.insert_one(doc) - logging.info(f'边抓取边保存新记录: {mix_name} - {item.get("play_vv", 0):,} 播放量 (排名: {current_rank})') + document_id = result.inserted_id - # 更新其他记录的排名 - self.collection.update_many( - { - 'play_vv': {'$lt': item.get('play_vv', 0)}, - 'batch_time': {'$gte': batch_time.replace(hour=0, minute=0, second=0, microsecond=0)}, - '_id': {'$ne': result.inserted_id} - }, - {'$inc': {'rank': 1}} + # 记录已保存的项目 + self.saved_items.add(item_key) + + logging.info(f'[立即保存] ✅ 成功保存合集基础信息: {mix_name} (播放量: {item_data.get("play_vv", 0):,}) - 文档ID: {document_id}') + + return document_id + + except Exception as e: + logging.error(f'[立即保存] 保存合集基础信息失败: {item_data.get("mix_name", "未知")} - 错误: {e}') + return None + + def update_collection_video_ids(self, document_id, mix_id: str, mix_name: str, current_episode_count: int): + """更新合集的视频ID列表(第二阶段更新)""" + if not self.realtime_save_enabled or self.collection is None or not document_id: + return False + + try: + logging.info(f'[增量更新] 开始获取合集 {mix_name} 的视频ID列表') + + # 获取视频ID列表 + episode_video_ids = self.get_collection_videos( + mix_id=mix_id, + mix_name=mix_name, + current_episode_count=current_episode_count ) + + if episode_video_ids: + # 更新数据库中的视频ID列表 + update_result = self.collection.update_one( + {'_id': document_id}, + { + '$set': { + 'episode_video_ids': episode_video_ids, + 'data_status': 'video_ids_updated', + 'last_updated': datetime.now() + } + } + ) + + if update_result.modified_count > 0: + logging.info(f'[增量更新] ✅ 成功更新视频ID列表: {mix_name} - 共 {len(episode_video_ids)} 个视频') + return episode_video_ids + else: + logging.warning(f'[增量更新] 更新视频ID列表失败: {mix_name}') + else: + logging.warning(f'[增量更新] 未获取到视频ID: {mix_name}') + + return [] + + except Exception as e: + logging.error(f'[增量更新] 更新视频ID列表失败: {mix_name} - 错误: {e}') + return [] + + def update_single_video_details(self, document_id, episode_number: int, video_id: str, video_details: dict, mix_name: str): + """更新单个视频的详细数据(第三阶段增量更新)""" + if not self.realtime_save_enabled or self.collection is None or not document_id: + return False + + try: + # 构建更新的视频详细信息 + episode_info = { + 'episode_number': episode_number, + 'video_id': video_id, + 'likes': video_details.get('likes', 0), + 'shares': video_details.get('shares', 0), + 'favorites': video_details.get('favorites', 0), + 'likes_formatted': self.format_interaction_count(video_details.get('likes', 0)), + 'shares_formatted': self.format_interaction_count(video_details.get('shares', 0)), + 'favorites_formatted': self.format_interaction_count(video_details.get('favorites', 0)), + 'comments': video_details.get('comments', []), + 'data_status': 'completed' + } + + # 更新数据库中对应集数的详细信息 + update_result = self.collection.update_one( + {'_id': document_id}, + { + '$set': { + f'episode_details.{episode_number - 1}': episode_info, + 'last_updated': datetime.now() + } + } + ) + + if update_result.modified_count > 0: + logging.info(f'[增量更新] ✅ 成功更新第 {episode_number} 集详细数据: {mix_name} - 点赞: {video_details.get("likes", 0):,}, 评论: {len(video_details.get("comments", []))}') + return True + else: + logging.warning(f'[增量更新] 更新第 {episode_number} 集详细数据失败: {mix_name}') + return False except Exception as e: - logging.error(f'实时保存单条数据到MongoDB时出错: {e}') + logging.error(f'[增量更新] 更新第 {episode_number} 集详细数据失败: {mix_name} - 错误: {e}') + return False + + def update_video_comments_realtime(self, document_id, episode_number: int, new_comments: list = None, mix_name: str = '', interaction_data: dict = None): + """实时更新视频评论和互动数据(第四阶段实时更新)""" + if not self.realtime_save_enabled or self.collection is None or not document_id: + return False + + # 检查是否有数据需要更新 + if not new_comments and not interaction_data: + return False + + try: + # 构建更新操作 + update_operations = {} + episode_prefix = f'episode_details.{episode_number - 1}' + + # 处理评论更新 + if new_comments: + update_operations['$push'] = { + f'{episode_prefix}.comments': {'$each': new_comments} + } + + # 处理互动数据更新 + set_fields = {'last_updated': datetime.now()} + if interaction_data: + # 更新点赞数据 + if 'likes' in interaction_data: + set_fields[f'{episode_prefix}.likes'] = interaction_data['likes'] + set_fields[f'{episode_prefix}.likes_formatted'] = interaction_data.get('likes_formatted', self.format_interaction_count(interaction_data['likes'])) + + # 更新分享数据 + if 'shares' in interaction_data: + set_fields[f'{episode_prefix}.shares'] = interaction_data['shares'] + set_fields[f'{episode_prefix}.shares_formatted'] = interaction_data.get('shares_formatted', self.format_interaction_count(interaction_data['shares'])) + + # 更新收藏数据 + if 'favorites' in interaction_data: + set_fields[f'{episode_prefix}.favorites'] = interaction_data['favorites'] + set_fields[f'{episode_prefix}.favorites_formatted'] = interaction_data.get('favorites_formatted', self.format_interaction_count(interaction_data['favorites'])) + + update_operations['$set'] = set_fields + + # 执行更新 + update_result = self.collection.update_one( + {'_id': document_id}, + update_operations + ) + + if update_result.modified_count > 0: + # 构建日志信息 + log_parts = [] + if new_comments: + log_parts.append(f"追加 {len(new_comments)} 条评论") + if interaction_data: + interaction_summary = [] + if 'likes' in interaction_data: + interaction_summary.append(f"点赞={interaction_data.get('likes_formatted', interaction_data['likes'])}") + if 'shares' in interaction_data: + interaction_summary.append(f"分享={interaction_data.get('shares_formatted', interaction_data['shares'])}") + if 'favorites' in interaction_data: + interaction_summary.append(f"收藏={interaction_data.get('favorites_formatted', interaction_data['favorites'])}") + if interaction_summary: + log_parts.append(f"更新互动数据({', '.join(interaction_summary)})") + + logging.info(f'[实时更新] ✅ 成功{", ".join(log_parts)}: {mix_name} 第 {episode_number} 集') + return True + else: + logging.warning(f'[实时更新] 更新失败: {mix_name} 第 {episode_number} 集') + return False + + except Exception as e: + logging.error(f'[实时更新] 更新失败: {mix_name} 第 {episode_number} 集 - 错误: {e}') + return False + + def save_single_item_realtime(self, item_data: dict): + """分阶段实时保存合集数据(新版本)""" + logging.info(f'[分阶段保存] 开始处理合集: {item_data.get("mix_name", "未知")}') + + # 第一阶段:立即保存基础信息 + document_id = self.save_collection_basic_info(item_data) + if not document_id: + return False + + # 第二阶段:获取并更新视频ID列表 + mix_id = item_data.get('mix_id', '') + mix_name = item_data.get('mix_name', '') + current_episode_count = item_data.get('updated_to_episode', 0) + + if mix_id and current_episode_count > 0: + episode_video_ids = self.update_collection_video_ids(document_id, mix_id, mix_name, current_episode_count) + + # 第三阶段:逐个获取并更新视频详细数据 + if episode_video_ids: + self.update_video_details_incrementally(document_id, episode_video_ids, mix_name) + + return True + + def update_video_details_incrementally(self, document_id, episode_video_ids: list, mix_name: str): + """增量更新视频详细数据""" + logging.info(f'[增量更新] 开始逐个获取视频详细数据: {mix_name}') + + for i, video_id in enumerate(episode_video_ids, 1): + if not video_id: + logging.warning(f'[增量更新] 第 {i} 集视频ID为空,跳过: {mix_name}') + continue + + try: + # 获取单个视频的详细数据 + logging.info(f'[增量更新] 获取第 {i}/{len(episode_video_ids)} 集视频详细数据: {mix_name}') + video_details = self.get_video_details(video_id, mix_name, document_id, i) + + if video_details and video_details.get('success', False): + # 立即更新到数据库 + self.update_single_video_details(document_id, i, video_id, video_details, mix_name) + else: + logging.warning(f'[增量更新] 第 {i} 集视频详细数据获取失败: {mix_name}') + + except Exception as e: + logging.error(f'[增量更新] 处理第 {i} 集视频时出错: {mix_name} - {e}') + continue def get_video_info(self, video_id: str) -> dict: """获取视频详细信息 @@ -1479,11 +1756,6 @@ class DouyinPlayVVScraper: logging.info(f'定时器模式:跳过 get_collection_videos 函数') return [] - # 定时器模式下跳过此函数 - if os.environ.get('TIMER_MODE') == '1': - logging.info(f'定时器模式:跳过 get_collection_videos 函数') - return [] - try: # 检查缓存文件 cache_dir = os.path.join(os.path.dirname(__file__), 'episode_video_ids') @@ -1627,7 +1899,8 @@ class DouyinPlayVVScraper: return [video['video_id'] for video in cached_videos] return [] - def _simulate_comment_scrolling(self, video_id: str, max_scroll_attempts: int = 10, scroll_delay: float = 2.0) -> list: + def _simulate_comment_scrolling(self, video_id: str, max_scroll_attempts: int = 10, scroll_delay: float = 2.0, + document_id=None, episode_number: int = 0, mix_name: str = '') -> list: """ 模拟用户异步滑动机制,向上滑动加载更多评论 Args: @@ -1637,9 +1910,9 @@ class DouyinPlayVVScraper: Returns: list: 收集到的所有评论数据 """ - # 检查AUTO_CONTINUE环境变量,如果设置为'1'则跳过评论滑动 - if os.environ.get('AUTO_CONTINUE') == '1' or self.auto_continue: - logging.info(f'🚀 AUTO_CONTINUE模式:跳过视频 {video_id} 的评论滑动加载') + # 检查是否应该跳过评论滑动(仅在定时器模式下跳过) + if should_skip_function('scroll_comments'): + logging.info(f'🚀 定时器模式:跳过视频 {video_id} 的评论滑动加载') return [] all_comments = [] @@ -1668,7 +1941,8 @@ class DouyinPlayVVScraper: scroll_future = executor.submit(self._async_scroll_task_with_state, max_scroll_attempts, scroll_delay, shared_state) # 同时提交监控任务 - 监控任务会检测滑动任务状态 - monitor_future = executor.submit(self._async_monitor_task_with_state, video_id, collected_comment_ids, shared_state, 3600) + monitor_future = executor.submit(self._async_monitor_task_with_state, video_id, collected_comment_ids, shared_state, 3600, + document_id, episode_number, mix_name) # 等待两个任务完成 scroll_result = scroll_future.result() @@ -1953,7 +2227,8 @@ class DouyinPlayVVScraper: return all_comments - def _async_monitor_task_with_state(self, video_id: str, collected_comment_ids: set, shared_state: dict, timeout: float) -> list: + def _async_monitor_task_with_state(self, video_id: str, collected_comment_ids: set, shared_state: dict, timeout: float, + document_id=None, episode_number: int = 0, mix_name: str = '') -> list: """带状态的异步监控任务 - 监控评论并检测滑动任务状态""" all_comments = [] start_time = time.time() @@ -1989,11 +2264,17 @@ class DouyinPlayVVScraper: new_comments = self._extract_comments_from_network_logs(video_id) # 去重并添加新评论 + new_comments_to_save = [] for comment in new_comments: comment_id = f"{comment.get('text', '')}_{comment.get('user_name', '')}" if comment_id not in collected_comment_ids: collected_comment_ids.add(comment_id) all_comments.append(comment) + new_comments_to_save.append(comment) + + # 实时保存新评论到数据库 + if new_comments_to_save and document_id and episode_number > 0: + self.update_video_comments_realtime(document_id, episode_number, new_comments_to_save, mix_name) # 检查是否有新评论 current_comment_count = len(all_comments) @@ -2292,11 +2573,10 @@ class DouyinPlayVVScraper: return comments - def get_video_details(self, video_id: str, max_comments: int = 100) -> dict: + def get_video_details(self, video_id: str, mix_name: str = '', document_id=None, episode_number: int = 0) -> dict: """获取单个视频的详细互动数据 Args: video_id: 视频ID - max_comments: 最大评论数量,默认100条 Returns: dict: 包含点赞数、收藏数、转发数、评论内容的字典 """ @@ -2313,14 +2593,17 @@ class DouyinPlayVVScraper: 'error': None } - # 检查AUTO_CONTINUE环境变量,如果设置为'1'则跳过详细数据获取 - if os.environ.get('AUTO_CONTINUE') == '1' or self.auto_continue: - logging.info(f'🚀 AUTO_CONTINUE模式:跳过视频 {video_id} 的详细数据获取(点赞、收藏、分享、评论)') + # 添加互动数据保存标记,避免重复保存 + interaction_data_saved = False + + # 检查是否应该跳过详细数据获取(仅在定时器模式下跳过) + if os.environ.get('AUTO_CONTINUE') == '1': + logging.info(f'🚀 定时器模式:跳过视频 {video_id} 的详细数据获取(点赞、收藏、分享、评论)') video_details['success'] = True - video_details['error'] = 'AUTO_CONTINUE模式:跳过详细数据获取' + video_details['error'] = '定时器模式:跳过详细数据获取' return video_details - logging.info(f'🔍 get_video_details 被调用: video_id={video_id}, max_comments={max_comments}') + logging.info(f'🔍 get_video_details 被调用: video_id={video_id}') try: # 确保driver已初始化 @@ -2391,6 +2674,20 @@ class DouyinPlayVVScraper: video_details['favorites_formatted'] = self.format_interaction_count(video_details['favorites']) logging.info(f'从初始网络日志获取视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') + + # 实时保存互动数据(仅在首次获取时保存) + if document_id and episode_number and not interaction_data_saved: + interaction_data = { + 'likes': video_details['likes'], + 'likes_formatted': video_details['likes_formatted'], + 'shares': video_details['shares'], + 'shares_formatted': video_details['shares_formatted'], + 'favorites': video_details['favorites'], + 'favorites_formatted': video_details['favorites_formatted'] + } + self.update_video_comments_realtime(document_id, episode_number, None, mix_name, interaction_data) + interaction_data_saved = True + break except Exception as e: @@ -2402,11 +2699,12 @@ class DouyinPlayVVScraper: # 启动滑动机制加载更多评论 logging.info(f'开始为视频 {video_id} 启动滑动机制加载评论') - scrolled_comments = self._simulate_comment_scrolling(video_id, max_scroll_attempts=15, scroll_delay=2.0) + scrolled_comments = self._simulate_comment_scrolling(video_id, max_scroll_attempts=15, scroll_delay=2.0, + document_id=document_id, episode_number=episode_number, mix_name=mix_name) # 如果滑动机制获取到评论,直接使用 if scrolled_comments: - video_details['comments'] = scrolled_comments[:max_comments] + video_details['comments'] = scrolled_comments logging.info(f'滑动机制成功获取 {len(video_details["comments"])} 条评论') # 获取滑动后的网络请求日志(用于评论数据) @@ -2439,7 +2737,7 @@ class DouyinPlayVVScraper: # 只有在滑动机制没有获取到评论时才使用这个方法 if not video_details['comments']: - for comment in comments[:max_comments]: + for comment in comments: comment_info = { 'text': comment.get('text', ''), 'user_name': comment.get('user', {}).get('nickname', ''), @@ -2460,7 +2758,7 @@ class DouyinPlayVVScraper: # 如果网络日志没有获取到数据,尝试页面解析 if video_details['likes'] == 0 and video_details['shares'] == 0 and video_details['favorites'] == 0: - video_details = self._parse_video_details_from_page(video_id, video_details, max_comments) + video_details = self._parse_video_details_from_page(video_id, video_details, document_id, episode_number, mix_name, interaction_data_saved) video_details['success'] = True return video_details @@ -2471,12 +2769,15 @@ class DouyinPlayVVScraper: video_details['error'] = error_msg return video_details - def _parse_video_details_from_page(self, video_id: str, video_details: dict, max_comments: int = 20) -> dict: + def _parse_video_details_from_page(self, video_id: str, video_details: dict, document_id: str = None, episode_number: int = 0, mix_name: str = "", interaction_data_saved: bool = False) -> dict: """从页面元素解析视频详细数据(备用方案) Args: video_id: 视频ID video_details: 现有的视频详细数据字典 - max_comments: 最大评论数量 + document_id: 文档ID + episode_number: 集数 + mix_name: 合集名称 + interaction_data_saved: 互动数据是否已保存 Returns: dict: 更新后的视频详细数据字典 """ @@ -2529,6 +2830,20 @@ class DouyinPlayVVScraper: video_details['favorites_formatted'] = self.format_interaction_count(video_details['favorites']) logging.info(f'从SSR数据解析到视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') + + # 实时保存互动数据(仅在首次获取时保存) + if document_id and episode_number and not interaction_data_saved: + interaction_data = { + 'likes': video_details['likes'], + 'likes_formatted': video_details['likes_formatted'], + 'shares': video_details['shares'], + 'shares_formatted': video_details['shares_formatted'], + 'favorites': video_details['favorites'], + 'favorites_formatted': video_details['favorites_formatted'] + } + self.update_video_comments_realtime(document_id, episode_number, None, mix_name, interaction_data) + interaction_data_saved = True + break except Exception as e: @@ -2581,6 +2896,19 @@ class DouyinPlayVVScraper: logging.info(f'从页面元素解析到视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') + # 实时保存互动数据(仅在首次获取时保存) + if document_id and episode_number and not interaction_data_saved: + interaction_data = { + 'likes': video_details['likes'], + 'likes_formatted': video_details['likes_formatted'], + 'shares': video_details['shares'], + 'shares_formatted': video_details['shares_formatted'], + 'favorites': video_details['favorites'], + 'favorites_formatted': video_details['favorites_formatted'] + } + self.update_video_comments_realtime(document_id, episode_number, None, mix_name, interaction_data) + interaction_data_saved = True + except Exception as e: logging.warning(f'CSS选择器解析失败: {e}') @@ -2600,7 +2928,7 @@ class DouyinPlayVVScraper: for selector in comment_selectors: try: - comment_elements = self.driver.find_elements("css selector", selector)[:max_comments] + comment_elements = self.driver.find_elements("css selector", selector) if comment_elements: for element in comment_elements: try: @@ -2630,18 +2958,17 @@ class DouyinPlayVVScraper: return video_details - def get_collection_video_details(self, episode_video_ids: list, mix_name: str = '', max_comments_per_video: int = 100) -> list: + def get_collection_video_details(self, episode_video_ids: list, mix_name: str = '') -> list: """获取合集中所有视频的详细互动数据 Args: episode_video_ids: 视频ID列表 mix_name: 合集名称,用于日志 - max_comments_per_video: 每个视频最大评论数量,默认100条 Returns: list: 包含每个视频详细数据的列表 """ - # AUTO_CONTINUE模式下跳过此函数 - if os.environ.get('AUTO_CONTINUE') == '1' or self.auto_continue: - logging.info(f'🚀 AUTO_CONTINUE模式:跳过 get_collection_video_details 函数(合集视频详细数据获取)') + # 检查是否应该跳过此函数(仅在定时器模式下跳过) + if should_skip_function('get_collection_video_details'): + logging.info(f'🚀 定时器模式:跳过 get_collection_video_details 函数(合集视频详细数据获取)') return [] if not episode_video_ids: @@ -2671,7 +2998,7 @@ class DouyinPlayVVScraper: try: # 获取单个视频的详细数据 - video_details = self.get_video_details(video_id, max_comments_per_video) + video_details = self.get_video_details(video_id) video_details['episode_number'] = i video_details_list.append(video_details) @@ -2717,17 +3044,8 @@ class DouyinPlayVVScraper: self.collect_network_bodies() self.parse_ssr_data() self.dedupe() - - # 根据模式选择保存方式 - is_timer_mode = os.environ.get('TIMER_MODE') == '1' - if is_timer_mode: - # 定时器模式:使用批量保存,所有数据使用相同的batch_time - self.save_results() - logging.info('定时器模式:完成批量保存,play_vv数量: %d', len(self.play_vv_items)) - else: - # 普通模式:数据已通过实时保存功能保存 - logging.info('普通模式:完成,play_vv数量: %d', len(self.play_vv_items)) - logging.info('所有数据已通过实时保存功能保存到数据库') + self.save_results() + logging.info('完成,play_vv数量: %d', len(self.play_vv_items)) finally: if self.driver: try: @@ -2736,19 +3054,47 @@ class DouyinPlayVVScraper: pass +def apply_timer_config(): + """应用定时器配置中的环境变量""" + try: + # 应用定时器环境变量配置 + config.apply_timer_environment() + + # 记录设置的环境变量 + for key, value in config.TIMER_ENV_CONFIG.items(): + logging.info(f'设置环境变量: {key}={value}') + + except Exception as e: + logging.warning(f'应用定时器配置失败: {e}') + +def should_skip_function(function_name): + """检查是否应该跳过指定的函数 - 只在定时器模式下启用""" + try: + # 只有在定时器模式下才检查跳过逻辑 + if os.environ.get('TIMER_MODE') == '1' and os.environ.get('AUTO_CONTINUE') == '1': + skip_functions = config.get_skip_functions() + return function_name in skip_functions + except Exception as e: + logging.warning(f'检查跳过函数配置失败: {e}') + + return False + if __name__ == '__main__': parser = argparse.ArgumentParser(description='Selenium+CDP 抖音play_vv抓取器') parser.add_argument('--url', default='https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation', help='收藏合集列表页面URL') parser.add_argument('--auto', action='store_true', help='自动继续,跳过回车等待') parser.add_argument('--duration', type=int, default=60, help='网络响应收集时长(秒)') parser.add_argument('--driver', help='覆盖chromedriver路径') + parser.add_argument('--timer', action='store_true', help='启用定时器模式,应用config.py中的定时器配置') args = parser.parse_args() + # 如果启用定时器模式,应用定时器配置 + if args.timer: + apply_timer_config() if args.driver: os.environ['OVERRIDE_CHROMEDRIVER'] = args.driver - if args.auto: - os.environ['AUTO_CONTINUE'] = '1' - + # 注意:AUTO_CONTINUE 环境变量只在定时器模式下通过 apply_timer_config() 设置 + # 普通模式下不设置任何环境变量,所有函数都正常运行 print('=== Selenium+CDP 抖音play_vv抓取器 ===') scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration) scraper.run() \ No newline at end of file