diff --git a/backend/Timer_worker.py b/backend/Timer_worker.py index 0a7c182..f50ef09 100644 --- a/backend/Timer_worker.py +++ b/backend/Timer_worker.py @@ -103,12 +103,6 @@ class DouyinAutoScheduler: # 设置环境变量,确保自动模式 os.environ['AUTO_CONTINUE'] = '1' - # 设置定时器模式环境变量,跳过评论抓取等函数 - os.environ['TIMER_MODE'] = '1' - - # 只在定时器模式下设置静默模式(非测试、非单次执行、非仅生成榜单) - if hasattr(self, '_is_timer_mode') and self._is_timer_mode: - os.environ['QUIET_MODE'] = '1' # 直接创建并运行 DouyinPlayVVScraper 实例 scraper = DouyinPlayVVScraper( @@ -116,11 +110,11 @@ class DouyinAutoScheduler: auto_continue=True, duration_s=60 ) - - logging.warning("📁 开始执行抓取任务...") + + logging.info("📁 开始执行抓取任务...") scraper.run() - - logging.warning("✅ 抖音播放量抓取任务执行成功") + + logging.info("✅ 抖音播放量抓取任务执行成功") # 数据抓取完成后,自动生成当日榜单 self.generate_daily_rankings() @@ -168,43 +162,35 @@ class DouyinAutoScheduler: today_videos_raw = list(douyin_collection.find({"batch_time": latest_batch_time}).sort("play_vv", -1)) logging.info(f"📊 最新批次数据数量: {len(today_videos_raw)}") - # 调试:检查原始数据 - if today_videos_raw: - sample_video = today_videos_raw[0] - logging.info(f"🔍 样本数据检查:") - logging.info(f" mix_name: {sample_video.get('mix_name')}") - logging.info(f" play_vv: {sample_video.get('play_vv')} (类型: {type(sample_video.get('play_vv'))})") - logging.info(f" author: {sample_video.get('author')}") + # 按短剧名称去重,每个短剧只保留播放量最高的一条 + unique_videos = {} + for video in today_videos_raw: + mix_name = video.get("mix_name", "") + if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)): + unique_videos[mix_name] = video - # 按短剧名称去重并确保数据类型正确 - unique_videos = self._deduplicate_videos_by_mix_name(today_videos_raw, include_rank=False) today_videos = list(unique_videos.values()) logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)") - # 获取昨天最后一批次的数据 - yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day) - yesterday_end = yesterday_start + timedelta(days=1) - yesterday_batch = douyin_collection.find_one({ - "batch_time": {"$gte": yesterday_start, "$lt": yesterday_end} - }, sort=[("batch_time", -1)]) + # 获取昨天的榜单数据(如果存在),取最新的计算结果 + yesterday_ranking = rankings_collection.find_one({ + "date": yesterday_str, + "type": "comprehensive" + }, sort=[("calculation_sequence", -1)]) yesterday_data = {} - if yesterday_batch: - # 获取昨天最后一批次的所有数据 - yesterday_videos = list(douyin_collection.find({ - "batch_time": yesterday_batch["batch_time"] - }).sort("play_vv", -1)) - - # 按短剧名称去重,保留播放量最高的记录,并确保数据类型正确 - yesterday_data = self._deduplicate_videos_by_mix_name(yesterday_videos, include_rank=True) - - # 计算排名 - sorted_videos = sorted(yesterday_data.items(), key=lambda x: x[1]["play_vv"], reverse=True) - for rank, (mix_name, data) in enumerate(sorted_videos, 1): - yesterday_data[mix_name]["rank"] = rank - - logging.info(f"📊 找到昨天的原始数据,共 {len(yesterday_data)} 个短剧") + if yesterday_ranking and "data" in yesterday_ranking: + # 将昨天的数据转换为字典,以短剧名称为键 + for item in yesterday_ranking["data"]: + title = item.get("title", "") + if title: + yesterday_data[title] = { + "rank": item.get("rank", 0), + "play_vv": item.get("play_vv", 0), + "video_id": item.get("video_id", "") + } + logging.info(f"📊 找到昨天的榜单数据,共 {len(yesterday_data)} 个短剧") else: logging.info("📊 未找到昨天的原始数据,将作为首次生成") diff --git a/backend/handlers/Rankings/rank_data_scraper.py b/backend/handlers/Rankings/rank_data_scraper.py index 249bec3..c0a6cd2 100644 --- a/backend/handlers/Rankings/rank_data_scraper.py +++ b/backend/handlers/Rankings/rank_data_scraper.py @@ -453,62 +453,8 @@ class DouyinPlayVVScraper: if n >= 10_000: return f"{n/10_000:.1f}万" return str(n) - - def format_interaction_count(self, n: int) -> str: - """格式化互动数据数量,返回带单位的字符串 - Args: - n: 数量 - Returns: - str: 格式化后的字符串,如 27898 -> 2.8W, 1234 -> 1234 - """ - if n >= 100_000_000: - result = n / 100_000_000 - if result == int(result): - return f"{int(result)}亿" - else: - return f"{result:.1f}亿" - elif n >= 10_000: - result = n / 10_000 - if result == int(result): - return f"{int(result)}W" - else: - return f"{result:.1f}W" - else: - return str(n) - def save_comments_to_file(self, comments: list, video_id: str = None): - """简单保存评论数据到JSON文件""" - try: - if not comments: - return None - - # 创建保存目录 - script_dir = os.path.dirname(os.path.abspath(__file__)) - save_dir = os.path.join(script_dir, 'saved_comments') - os.makedirs(save_dir, exist_ok=True) - - # 生成文件名 - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - filename = f'comments_{video_id}_{timestamp}.json' if video_id else f'comments_{timestamp}.json' - file_path = os.path.join(save_dir, filename) - - # 保存数据 - save_data = { - 'timestamp': datetime.now().isoformat(), - 'video_id': video_id, - 'total_comments': len(comments), - 'comments': comments - } - - with open(file_path, 'w', encoding='utf-8') as f: - json.dump(save_data, f, ensure_ascii=False, indent=2) - - logging.info(f'保存 {len(comments)} 条评论到: {file_path}') - return file_path - - except Exception as e: - logging.error(f'保存评论失败: {e}') - return None + def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None): """解析文本中的play_vv、mix_name和watched_item信息""" @@ -1412,1071 +1358,6 @@ class DouyinPlayVVScraper: return [video['video_id'] for video in cached_videos] return [] - def _simulate_comment_scrolling(self, video_id: str, max_scroll_attempts: int = 10, scroll_delay: float = 2.0) -> list: - """ - 模拟用户异步滑动机制,向上滑动加载更多评论 - Args: - video_id: 视频ID - max_scroll_attempts: 最大滑动尝试次数,默认10次 - scroll_delay: 每次滑动后的延迟时间(秒),默认2秒 - Returns: - list: 收集到的所有评论数据 - """ - all_comments = [] - collected_comment_ids = set() - - try: - logging.info(f'开始为视频 {video_id} 执行评论滑动加载机制') - - # 等待页面加载完成 - time.sleep(3) - - # 定位评论区域 - self._scroll_to_comment_section() - - # 点击评论区域以触发网络请求 - self._click_comment_area() - - # 使用线程池实现异步滑动和监控 - from concurrent.futures import ThreadPoolExecutor - import threading - - # 创建共享状态对象,用于任务间通信 - shared_state = { - 'scroll_completed': False, - 'lock': threading.Lock() - } - - with ThreadPoolExecutor(max_workers=2) as executor: - # 提交滑动任务 - scroll_future = executor.submit(self._async_scroll_task_with_state, max_scroll_attempts, scroll_delay, shared_state) - - # 同时提交监控任务 - 监控任务会检测滑动任务状态 - monitor_future = executor.submit(self._async_monitor_task_with_state, video_id, collected_comment_ids, shared_state, 3600) - - # 等待两个任务完成 - scroll_result = scroll_future.result() - monitor_comments = monitor_future.result() - - all_comments.extend(monitor_comments) - - logging.info(f'评论滑动加载完成,共收集到 {len(all_comments)} 条评论') - - # 保存评论到文件 - if all_comments: - self.save_comments_to_file(all_comments, video_id) - - return all_comments - - except Exception as e: - logging.error(f'评论滑动加载机制执行失败: {e}') - return all_comments - - - - def _async_scroll_task_with_state(self, max_attempts: int, scroll_delay: float, shared_state: dict): - """带状态的异步滑动任务 - 无限滑动直到检测到"暂时没有更多评论"文本""" - try: - consecutive_no_progress = 0 # 连续无进展次数 - attempt = 0 - - logging.info('开始无限滑动,直到检测到"暂时没有更多评论"') - - while True: # 无限循环,直到检测到底部文本 - attempt += 1 - logging.info(f'第 {attempt} 次向上滑动') - - # 记录滑动前的位置 - current_position = self.driver.execute_script("return window.pageYOffset;") - - # 执行向上滑动(加载更多评论) - self._execute_upward_scroll(attempt) - - # 等待新内容加载 - time.sleep(scroll_delay) - - # 优先检查是否到达底部(检测到"暂时没有更多评论"文本) - if self._check_comment_section_bottom(): - logging.info('检测到"暂时没有更多评论",停止滑动') - break - - # 检查滑动是否有效果 - new_position = self.driver.execute_script("return window.pageYOffset;") - if abs(new_position - current_position) < 50: # 滑动距离太小 - consecutive_no_progress += 1 - logging.debug(f'滑动进展较小,连续无进展次数: {consecutive_no_progress}') - - # 如果连续多次无进展,增加滑动力度 - if consecutive_no_progress >= 5: - logging.info('连续多次滑动无进展,增加滑动力度') - self._execute_force_scroll() - consecutive_no_progress = 0 # 重置计数器 - time.sleep(scroll_delay * 2) # 增加等待时间 - - # 再次检查是否到达底部 - if self._check_comment_section_bottom(): - logging.info('强制滑动后检测到底部,停止滑动') - break - else: - consecutive_no_progress = 0 - - # 每50次滑动输出一次进度信息 - if attempt % 50 == 0: - logging.info(f'已完成 {attempt} 次滑动,继续寻找"暂时没有更多评论"文本') - - # 安全机制:如果滑动次数过多,暂停一下 - if attempt % 200 == 0: - logging.info(f'已滑动 {attempt} 次,暂停5秒以避免过度请求') - time.sleep(5) - - # 滑动任务完成,通知监控任务 - with shared_state['lock']: - shared_state['scroll_completed'] = True - logging.info('滑动任务已完成,通知监控任务结束') - - except Exception as e: - logging.warning(f'滑动任务出错: {e}') - # 即使出错也要通知监控任务结束 - with shared_state['lock']: - shared_state['scroll_completed'] = True - - def _execute_force_scroll(self): - """执行强制滑动,用于突破可能的滑动阻塞""" - try: - logging.info('执行强制滑动以突破阻塞') - - # 执行多重强制滑动策略 - self.driver.execute_script(""" - // 1. 多次大幅度滑动 - for (let i = 0; i < 5; i++) { - window.scrollBy(0, 1000); - document.documentElement.scrollTop += 1000; - document.body.scrollTop += 1000; - } - - // 2. 滑动到页面最底部 - window.scrollTo(0, document.body.scrollHeight); - - // 3. 强制滚动所有容器 - const containers = document.querySelectorAll('[data-e2e="comment-list"], .comment-list, [class*="comment"], [class*="scroll"]'); - containers.forEach(container => { - if (container.scrollTop !== undefined) { - container.scrollTop = container.scrollHeight; - container.dispatchEvent(new Event('scroll', { bubbles: true })); - } - }); - - // 4. 触发所有滚动相关事件 - ['scroll', 'wheel', 'touchmove', 'resize', 'load'].forEach(eventType => { - window.dispatchEvent(new Event(eventType, { bubbles: true })); - document.dispatchEvent(new Event(eventType, { bubbles: true })); - }); - - // 5. 模拟用户交互 - document.body.click(); - - console.log('执行强制滑动完成'); - """) - - time.sleep(3) # 增加等待时间 - - # 再次滑动到底部确保效果 - self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") - time.sleep(1) - - logging.debug('强制滑动操作完成') - - except Exception as e: - logging.warning(f'执行强制滑动失败: {e}') - - def _execute_upward_scroll(self, attempt: int): - """执行向上滑动操作 - 使用强力滑动策略确保有效触发懒加载""" - try: - # 记录滑动前状态 - before_state = self.driver.execute_script(""" - return { - scrollTop: window.pageYOffset, - commentCount: document.querySelectorAll('[data-e2e="comment-item"], [class*="comment"], .comment-item').length, - pageHeight: document.documentElement.scrollHeight - }; - """) - - logging.debug(f'滑动前状态: 位置={before_state["scrollTop"]}px, 评论数={before_state["commentCount"]}条') - - # 计算滑动距离,递增以确保效果 - scroll_distance = 800 + (attempt * 300) - - # 执行强力滚动 - 参考111.py的实现 - self.driver.execute_script(f""" - // 1. 强制滚动页面 - window.scrollBy(0, {scroll_distance}); - document.documentElement.scrollTop += {scroll_distance}; - document.body.scrollTop += {scroll_distance}; - - // 2. 滚动到页面底部(触发懒加载) - window.scrollTo(0, document.body.scrollHeight); - - // 3. 查找并滚动所有可能的评论容器 - const containers = document.querySelectorAll('[data-e2e="comment-list"], .comment-list, [class*="comment"], [class*="scroll"], [role="main"]'); - containers.forEach(container => {{ - if (container.scrollTop !== undefined) {{ - container.scrollTop = container.scrollHeight; - container.dispatchEvent(new Event('scroll', {{ bubbles: true }})); - }} - }}); - - // 4. 触发所有相关事件 - ['scroll', 'wheel', 'touchmove', 'resize'].forEach(eventType => {{ - window.dispatchEvent(new Event(eventType, {{ bubbles: true }})); - document.dispatchEvent(new Event(eventType, {{ bubbles: true }})); - }}); - - // 5. 模拟用户交互 - document.body.click(); - - console.log('执行强力滚动:', {scroll_distance}, 'px'); - """) - - time.sleep(2) # 等待页面响应 - - # 尝试点击加载更多按钮(如果存在) - try: - button_clicked = self.driver.execute_script(""" - const selectors = [ - '[data-e2e="comment-load-more"]', - '[class*="load-more"]', - '[class*="more-comment"]', - 'button[class*="load"]', - 'div[class*="load"]' - ]; - - for (let selector of selectors) { - const buttons = document.querySelectorAll(selector); - for (let button of buttons) { - if (button.offsetParent !== null && !button.disabled) { - button.click(); - console.log('点击了加载更多按钮:', selector); - return true; - } - } - } - return false; - """) - - if button_clicked: - logging.debug('成功点击了加载更多按钮') - time.sleep(1) # 等待按钮响应 - - except Exception as e: - logging.debug(f'点击加载更多按钮失败: {e}') - - # 每隔几次使用真实手势滑动 - if attempt % 3 == 0: - self._simulate_real_swipe() - - logging.debug(f'执行强力滑动,距离: {scroll_distance}px') - - except Exception as e: - logging.warning(f'执行滑动操作失败: {e}') - - def _simulate_real_swipe(self): - """模拟真实向上滑动手势 - 手指从下往上移动""" - try: - - window_size = self.driver.get_window_size() - width = window_size['width'] - height = window_size['height'] - - # 向上滑动手势:手指从屏幕下方往上方移动 - start_x = width // 2 + random.randint(-20, 20) # 增加随机性 - start_y = height * 4 // 5 # 从更靠下的位置开始(4/5处) - end_y = height // 5 # 到更靠上的位置结束(1/5处) - - # 使用ActionChains模拟真实向上滑动手势 - actions = ActionChains(self.driver) - actions.w3c_actions.pointer_action\ - .move_to_location(start_x, start_y)\ - .pointer_down()\ - .pause(0.1)\ - .move_to_location(start_x, end_y)\ - .pause(0.1)\ - .pointer_up() - actions.perform() - - logging.debug(f'执行真实向上滑动手势: 从({start_x}, {start_y})到({start_x}, {end_y})') - - except Exception as e: - logging.debug(f'真实手势滑动失败: {e}') - - def _async_monitor_task(self, video_id: str, collected_comment_ids: set, timeout: float) -> list: - """异步监控任务""" - all_comments = [] - start_time = time.time() - - while time.time() - start_time < timeout: - try: - # 从网络日志获取新评论 - new_comments = self._extract_comments_from_network_logs(video_id) - - # 去重并添加新评论 - for comment in new_comments: - comment_id = f"{comment.get('text', '')}_{comment.get('user_name', '')}" - if comment_id not in collected_comment_ids: - collected_comment_ids.add(comment_id) - all_comments.append(comment) - - if new_comments: - logging.info(f'监控到 {len(new_comments)} 条新评论,总计 {len(all_comments)} 条') - - # 短暂等待后继续监控 - time.sleep(1) - - except Exception as e: - logging.warning(f'监控任务出错: {e}') - time.sleep(2) - - return all_comments - - def _async_monitor_task_with_state(self, video_id: str, collected_comment_ids: set, shared_state: dict, timeout: float) -> list: - """带状态的异步监控任务 - 监控评论并检测滑动任务状态""" - all_comments = [] - start_time = time.time() - last_comment_count = 0 - no_new_comments_count = 0 - - logging.info('开始监控评论,将持续到滑动任务完成') - - while time.time() - start_time < timeout: - try: - # 检查滑动任务是否完成 - with shared_state['lock']: - scroll_completed = shared_state['scroll_completed'] - - if scroll_completed: - logging.info('检测到滑动任务已完成,监控任务即将结束') - # 滑动完成后再监控5秒,确保收集到最后的评论 - final_start = time.time() - while time.time() - final_start < 5: - try: - new_comments = self._extract_comments_from_network_logs(video_id) - for comment in new_comments: - comment_id = f"{comment.get('text', '')}_{comment.get('user_name', '')}" - if comment_id not in collected_comment_ids: - collected_comment_ids.add(comment_id) - all_comments.append(comment) - time.sleep(0.5) - except Exception as e: - logging.warning(f'最终监控阶段出错: {e}') - break - - # 从网络日志获取新评论 - new_comments = self._extract_comments_from_network_logs(video_id) - - # 去重并添加新评论 - for comment in new_comments: - comment_id = f"{comment.get('text', '')}_{comment.get('user_name', '')}" - if comment_id not in collected_comment_ids: - collected_comment_ids.add(comment_id) - all_comments.append(comment) - - # 检查是否有新评论 - current_comment_count = len(all_comments) - if current_comment_count > last_comment_count: - logging.info(f'监控到 {current_comment_count - last_comment_count} 条新评论,总计 {current_comment_count} 条') - last_comment_count = current_comment_count - no_new_comments_count = 0 - else: - no_new_comments_count += 1 - # 每30秒输出一次状态 - if no_new_comments_count % 30 == 0: - logging.info(f'监控中...当前总计 {current_comment_count} 条评论,等待滑动任务完成') - - # 短暂等待后继续监控 - time.sleep(1) - - except Exception as e: - logging.warning(f'监控任务出错: {e}') - time.sleep(2) - - logging.info(f'监控任务结束,共收集到 {len(all_comments)} 条评论') - return all_comments - - def _scroll_to_comment_section(self): - """滚动到评论区域""" - try: - comment_section_selectors = [ - '[data-e2e="comment-list"]', - '[class*="comment-list"]', - '[class*="comment-container"]', - ] - - for selector in comment_section_selectors: - try: - elements = self.driver.find_elements("css selector", selector) - if elements: - self.driver.execute_script( - "arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", - elements[0] - ) - time.sleep(2) - logging.info(f'成功定位到评论区域: {selector}') - return - except Exception: - continue - - # 备用方案:滚动到页面底部 - self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") - time.sleep(2) - logging.info('使用备用方案:滚动到页面底部') - - except Exception as e: - logging.warning(f'定位评论区域失败: {e}') - - def _click_comment_area(self): - """ - 点击评论区域以触发网络请求,确保能够获取到评论数据 - """ - try: - # 多种评论区域选择器 - comment_selectors = [ - '[data-e2e="comment-list"]', - '[class*="comment"]', - '[class*="Comment"]', - '.comment-list', - '.comment-container', - '[data-e2e="comment-item"]', - '[class*="comment-item"]', - 'div[class*="comment"]', - # 抖音特有的评论区域选择器 - 'div[data-e2e="comment-list"]', - 'div[class*="CommentList"]', - 'div[class*="comment-list"]' - ] - - clicked = False - for selector in comment_selectors: - try: - elements = self.driver.find_elements("css selector", selector) - if elements: - for element in elements: - try: - if element.is_displayed() and element.is_enabled(): - # 滚动到元素可见 - self.driver.execute_script("arguments[0].scrollIntoView(true);", element) - time.sleep(0.5) - - # 点击元素 - element.click() - logging.info(f'成功点击评论区域: {selector}') - clicked = True - time.sleep(1) # 等待网络请求触发 - break - except Exception as e: - logging.debug(f'点击元素失败: {e}') - continue - if clicked: - break - except Exception as e: - logging.debug(f'使用选择器 {selector} 查找评论区域失败: {e}') - continue - - if not clicked: - # 如果没有找到特定的评论区域,尝试点击页面中部区域 - try: - window_size = self.driver.get_window_size() - center_x = window_size['width'] // 2 - center_y = window_size['height'] // 2 - - # 使用JavaScript点击页面中部 - self.driver.execute_script(f""" - var element = document.elementFromPoint({center_x}, {center_y}); - if (element) {{ - element.click(); - }} - """) - logging.info('点击页面中部区域以触发评论加载') - time.sleep(1) - except Exception as e: - logging.debug(f'点击页面中部失败: {e}') - - except Exception as e: - logging.warning(f'点击评论区域失败: {e}') - - def _check_comment_section_bottom(self) -> bool: - """ - 检测是否已经到达评论区底部 - 只有检测到"暂时没有更多评论"文本时才停止滑动,确保无限滑动直到真正到达底部 - Returns: - bool: True表示已到达底部,False表示还可以继续加载 - """ - try: - # 目标文本:只有检测到这个文本才认为到达底部 - target_text = "暂时没有更多评论" - - logging.debug(f'正在检测评论区底部标识文本: "{target_text}"') - - # 方法1: 使用XPath检测包含文本的元素 - xpath_selectors = [ - f"//*[contains(text(), '{target_text}')]", - f"//div[contains(text(), '{target_text}')]", - f"//span[contains(text(), '{target_text}')]", - f"//p[contains(text(), '{target_text}')]", - f"//*[text()='{target_text}']" - ] - - for xpath in xpath_selectors: - try: - elements = self.driver.find_elements("xpath", xpath) - if elements: - # 检查元素是否可见 - for element in elements: - try: - if element.is_displayed(): - logging.info(f'检测到评论区底部标识文本: "{target_text}" (通过XPath: {xpath})') - return True - except Exception: - continue - except Exception as e: - logging.debug(f'XPath检测失败 {xpath}: {e}') - continue - - # 方法2: 使用JavaScript在页面中搜索文本 - try: - js_result = self.driver.execute_script(f""" - // 搜索页面中所有包含目标文本的元素 - var targetText = '{target_text}'; - var walker = document.createTreeWalker( - document.body, - NodeFilter.SHOW_TEXT, - null, - false - ); - - var node; - while (node = walker.nextNode()) {{ - if (node.textContent.includes(targetText)) {{ - var element = node.parentElement; - if (element && element.offsetParent !== null) {{ - return {{ - found: true, - text: node.textContent.trim(), - tagName: element.tagName, - className: element.className - }}; - }} - }} - }} - return {{found: false}}; - """) - - if js_result and js_result.get('found'): - logging.info(f'通过JavaScript检测到评论区底部标识文本: "{target_text}"') - logging.debug(f'元素信息: 标签={js_result.get("tagName")}, 类名={js_result.get("className")}, 文本="{js_result.get("text")}"') - return True - - except Exception as e: - logging.debug(f'JavaScript文本检测失败: {e}') - - # 方法3: 检查页面源码中是否包含完整的目标文本 - try: - page_source = self.driver.page_source - if target_text in page_source: - # 进一步验证:使用正则表达式确保是完整的文本匹配 - pattern = re.escape(target_text) - if re.search(pattern, page_source): - logging.info(f'在页面源码中检测到完整的底部标识文本: "{target_text}"') - return True - - except Exception as e: - logging.debug(f'页面源码检测失败: {e}') - - # 检查页面滚动位置(仅用于调试信息) - try: - current_position = self.driver.execute_script("return window.pageYOffset;") - page_height = self.driver.execute_script("return document.body.scrollHeight;") - window_height = self.driver.execute_script("return window.innerHeight;") - distance_to_bottom = page_height - (current_position + window_height) - - logging.debug(f'滚动状态: 当前位置={current_position}, 页面高度={page_height}, 窗口高度={window_height}, 距离底部={distance_to_bottom}px') - - # 即使滚动到底部,也不停止滑动,除非检测到目标文本 - if distance_to_bottom <= 10: - logging.debug(f'已滚动到页面底部,但未检测到"{target_text}"文本,继续滑动') - - except Exception as e: - logging.debug(f'检查滚动位置失败: {e}') - - # 只有检测到"暂时没有更多评论"文本才返回True,否则继续滑动 - logging.debug(f'未检测到"{target_text}"文本,继续滑动') - return False - - except Exception as e: - logging.warning(f'检测评论区底部失败: {e}') - return False - - def _extract_comments_from_network_logs(self, video_id: str) -> list: - """ - 从网络日志中提取评论数据 - Args: - video_id: 视频ID - Returns: - list: 评论数据列表 - """ - comments = [] - try: - # 获取网络请求日志 - logs = self.driver.get_log('performance') - - for entry in logs: - try: - log = json.loads(entry['message'])['message'] - if ( - 'Network.responseReceived' in log['method'] - and 'response' in log['params'] - and log['params']['response'] - and log['params']['response'].get('url') - ): - url = log['params']['response']['url'] - - # 检查是否是评论API - if '/aweme/v1/web/comment/list/' in url and video_id in url: - try: - # 获取响应体 - response_body = self.driver.execute_cdp_cmd( - 'Network.getResponseBody', - {'requestId': log['params']['requestId']} - ) - - if response_body and 'body' in response_body: - data = json.loads(response_body['body']) - api_comments = data.get('comments', []) - - for comment in api_comments: - comment_info = { - 'text': comment.get('text', ''), - 'user_name': comment.get('user', {}).get('nickname', ''), - 'digg_count': int(comment.get('digg_count', 0)), - 'create_time': comment.get('create_time', 0) - } - comments.append(comment_info) - - # 记录API URL信息,用于调试 - if api_comments: - logging.debug(f'从API获取到 {len(api_comments)} 条评论: {url}') - - except Exception as e: - logging.debug(f'解析评论API响应失败: {e}') - continue - - except Exception as e: - continue - - except Exception as e: - logging.warning(f'提取网络日志评论数据失败: {e}') - - return comments - - def get_video_details(self, video_id: str, max_comments: int = 100) -> dict: - """获取单个视频的详细互动数据 - Args: - video_id: 视频ID - max_comments: 最大评论数量,默认100条 - Returns: - dict: 包含点赞数、收藏数、转发数、评论内容的字典 - """ - video_details = { - 'video_id': video_id, - 'likes': 0, - 'shares': 0, - 'favorites': 0, - 'likes_formatted': '0', - 'shares_formatted': '0', - 'favorites_formatted': '0', - 'comments': [], - 'success': False, - 'error': None - } - - try: - # 确保driver已初始化 - if self.driver is None: - logging.info('Driver未初始化,正在设置...') - self.setup_driver() - if self.driver is None: - raise Exception("无法初始化WebDriver") - - video_url = f'https://www.douyin.com/video/{video_id}' - logging.info(f'获取视频详细数据: {video_url}') - - # 导航到视频页面 - self.driver.get(video_url) - time.sleep(3) - - # 等待页面加载完成 - try: - from selenium.webdriver.support.ui import WebDriverWait - from selenium.webdriver.support import expected_conditions as EC - from selenium.webdriver.common.by import By - - WebDriverWait(self.driver, 10).until( - EC.presence_of_element_located((By.TAG_NAME, "video")) - ) - except Exception as e: - logging.warning(f'等待视频元素超时: {e}') - - # 首先获取页面加载时的网络请求日志(关键修复) - logging.info(f'获取页面加载时的网络日志以捕获视频详情API') - initial_logs = self.driver.get_log('performance') - - # 解析初始网络日志获取视频详细数据cc - for entry in initial_logs: - try: - log = json.loads(entry['message'])['message'] - if ( - 'Network.responseReceived' in log['method'] - and 'response' in log['params'] - and log['params']['response'] - and log['params']['response'].get('url') - ): - url = log['params']['response']['url'] - - # 检查是否是视频详情API - if '/aweme/v1/web/aweme/detail/' in url and video_id in url: - try: - # 获取响应体 - response_body = self.driver.execute_cdp_cmd( - 'Network.getResponseBody', - {'requestId': log['params']['requestId']} - ) - - if response_body and 'body' in response_body: - data = json.loads(response_body['body']) - aweme_detail = data.get('aweme_detail', {}) - - if aweme_detail: - # 获取统计数据 - statistics = aweme_detail.get('statistics', {}) - video_details['likes'] = int(statistics.get('digg_count', 0)) - video_details['shares'] = int(statistics.get('share_count', 0)) - video_details['favorites'] = int(statistics.get('collect_count', 0)) - - # 添加格式化字段 - video_details['likes_formatted'] = self.format_interaction_count(video_details['likes']) - video_details['shares_formatted'] = self.format_interaction_count(video_details['shares']) - video_details['favorites_formatted'] = self.format_interaction_count(video_details['favorites']) - - logging.info(f'从初始网络日志获取视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') - break - - except Exception as e: - logging.warning(f'解析初始视频详情API响应失败: {e}') - continue - - except Exception as e: - continue - - # 启动滑动机制加载更多评论 - logging.info(f'开始为视频 {video_id} 启动滑动机制加载评论') - scrolled_comments = self._simulate_comment_scrolling(video_id, max_scroll_attempts=15, scroll_delay=2.0) - - # 如果滑动机制获取到评论,直接使用 - if scrolled_comments: - video_details['comments'] = scrolled_comments[:max_comments] - logging.info(f'滑动机制成功获取 {len(video_details["comments"])} 条评论') - - # 获取滑动后的网络请求日志(用于评论数据) - logs = self.driver.get_log('performance') - - # 解析滑动后的网络日志获取评论数据(作为滑动机制的补充) - for entry in logs: - try: - log = json.loads(entry['message'])['message'] - if ( - 'Network.responseReceived' in log['method'] - and 'response' in log['params'] - and log['params']['response'] - and log['params']['response'].get('url') - ): - url = log['params']['response']['url'] - - # 只处理评论API(视频详情API已在初始阶段处理) - if '/aweme/v1/web/comment/list/' in url and video_id in url and not video_details['comments']: - try: - # 获取响应体 - response_body = self.driver.execute_cdp_cmd( - 'Network.getResponseBody', - {'requestId': log['params']['requestId']} - ) - - if response_body and 'body' in response_body: - data = json.loads(response_body['body']) - comments = data.get('comments', []) - - # 只有在滑动机制没有获取到评论时才使用这个方法 - if not video_details['comments']: - for comment in comments[:max_comments]: - comment_info = { - 'text': comment.get('text', ''), - 'user_name': comment.get('user', {}).get('nickname', ''), - 'digg_count': int(comment.get('digg_count', 0)), - 'create_time': comment.get('create_time', 0) - } - video_details['comments'].append(comment_info) - - logging.info(f'备用方案获取到 {len(comments)} 条评论') - logging.info(f'评论API URL: {url}') - - except Exception as e: - logging.warning(f'解析评论API响应失败: {e}') - continue - - except Exception as e: - continue - - # 如果网络日志没有获取到数据,尝试页面解析 - if video_details['likes'] == 0 and video_details['shares'] == 0 and video_details['favorites'] == 0: - video_details = self._parse_video_details_from_page(video_id, video_details, max_comments) - - video_details['success'] = True - return video_details - - except Exception as e: - error_msg = f'获取视频 {video_id} 详细数据失败: {e}' - logging.error(error_msg) - video_details['error'] = error_msg - return video_details - - def _parse_video_details_from_page(self, video_id: str, video_details: dict, max_comments: int = 20) -> dict: - """从页面元素解析视频详细数据(备用方案) - Args: - video_id: 视频ID - video_details: 现有的视频详细数据字典 - max_comments: 最大评论数量 - Returns: - dict: 更新后的视频详细数据字典 - """ - try: - logging.info(f'尝试从页面元素解析视频 {video_id} 的详细数据') - - # 尝试解析页面中的SSR数据 - try: - # 查找包含视频数据的script标签 - scripts = self.driver.find_elements("tag name", "script") - for script in scripts: - script_content = script.get_attribute('innerHTML') - if script_content and ('window._SSR_HYDRATED_DATA' in script_content or 'RENDER_DATA' in script_content): - # 提取JSON数据 - if 'window._SSR_HYDRATED_DATA' in script_content: - match = re.search(r'window\._SSR_HYDRATED_DATA\s*=\s*({.*?});', script_content, re.DOTALL) - else: - match = re.search(r'window\.RENDER_DATA\s*=\s*({.*?});', script_content, re.DOTALL) - - if match: - data = json.loads(match.group(1)) - - # 查找视频详情数据 - def find_video_data(obj, target_id): - if isinstance(obj, dict): - for key, value in obj.items(): - if key == 'aweme_id' and str(value) == str(target_id): - return obj - elif isinstance(value, (dict, list)): - result = find_video_data(value, target_id) - if result: - return result - elif isinstance(obj, list): - for item in obj: - result = find_video_data(item, target_id) - if result: - return result - return None - - video_data = find_video_data(data, video_id) - if video_data: - statistics = video_data.get('statistics', {}) - video_details['likes'] = int(statistics.get('digg_count', 0)) - video_details['shares'] = int(statistics.get('share_count', 0)) - video_details['favorites'] = int(statistics.get('collect_count', 0)) - - # 添加格式化字段 - video_details['likes_formatted'] = self.format_interaction_count(video_details['likes']) - video_details['shares_formatted'] = self.format_interaction_count(video_details['shares']) - video_details['favorites_formatted'] = self.format_interaction_count(video_details['favorites']) - - logging.info(f'从SSR数据解析到视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') - break - - except Exception as e: - logging.warning(f'解析SSR数据失败: {e}') - - # 如果SSR数据解析失败,尝试CSS选择器 - if video_details['likes'] == 0 and video_details['shares'] == 0 and video_details['favorites'] == 0: - try: - # 尝试常见的点赞、分享、收藏按钮选择器 - selectors = { - 'likes': [ - '[data-e2e="video-like-count"]', - '[class*="like"] [class*="count"]', - '[class*="digg"] [class*="count"]' - ], - 'shares': [ - '[data-e2e="video-share-count"]', - '[class*="share"] [class*="count"]' - ], - 'favorites': [ - '[data-e2e="video-collect-count"]', - '[class*="collect"] [class*="count"]', - '[class*="favorite"] [class*="count"]' - ] - } - - for data_type, selector_list in selectors.items(): - for selector in selector_list: - try: - elements = self.driver.find_elements("css selector", selector) - if elements: - text = elements[0].text.strip() - if text and text.replace('.', '').replace('万', '').replace('亿', '').isdigit(): - # 转换数字格式 - if '亿' in text: - video_details[data_type] = int(float(text.replace('亿', '')) * 100000000) - elif '万' in text: - video_details[data_type] = int(float(text.replace('万', '')) * 10000) - else: - video_details[data_type] = int(text) - break - except Exception: - continue - - if video_details['likes'] > 0 or video_details['shares'] > 0 or video_details['favorites'] > 0: - # 添加格式化字段 - video_details['likes_formatted'] = self.format_interaction_count(video_details['likes']) - video_details['shares_formatted'] = self.format_interaction_count(video_details['shares']) - video_details['favorites_formatted'] = self.format_interaction_count(video_details['favorites']) - - logging.info(f'从页面元素解析到视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') - - except Exception as e: - logging.warning(f'CSS选择器解析失败: {e}') - - # 尝试获取评论(如果还没有获取到) - if not video_details['comments']: - try: - # 滚动到评论区域 - self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") - time.sleep(2) - - # 尝试常见的评论选择器 - comment_selectors = [ - '[data-e2e="comment-item"]', - '[class*="comment-item"]', - '[class*="comment"] [class*="content"]' - ] - - for selector in comment_selectors: - try: - comment_elements = self.driver.find_elements("css selector", selector)[:max_comments] - if comment_elements: - for element in comment_elements: - try: - comment_text = element.text.strip() - if comment_text: - comment_info = { - 'text': comment_text, - 'user_name': '', - 'digg_count': 0, - 'create_time': 0 - } - video_details['comments'].append(comment_info) - except Exception: - continue - - if video_details['comments']: - logging.info(f'从页面元素获取到视频 {video_id} 的 {len(video_details["comments"])} 条评论') - break - except Exception: - continue - - except Exception as e: - logging.warning(f'获取评论失败: {e}') - - except Exception as e: - logging.warning(f'页面解析视频详细数据失败: {e}') - - return video_details - - def get_collection_video_details(self, episode_video_ids: list, mix_name: str = '', max_comments_per_video: int = 100) -> list: - """获取合集中所有视频的详细互动数据 - Args: - episode_video_ids: 视频ID列表 - mix_name: 合集名称,用于日志 - max_comments_per_video: 每个视频最大评论数量,默认100条 - Returns: - list: 包含每个视频详细数据的列表 - """ - # 定时器模式下跳过此函数 - if os.environ.get('TIMER_MODE') == '1': - logging.info(f'定时器模式:跳过 get_collection_video_details 函数') - return [] - - if not episode_video_ids: - logging.info(f'合集 {mix_name} 没有视频ID,跳过详细数据获取') - return [] - - logging.info(f'开始获取合集 {mix_name} 中 {len(episode_video_ids)} 个视频的详细数据') - - video_details_list = [] - - for i, video_id in enumerate(episode_video_ids, 1): - if not video_id: - logging.warning(f'合集 {mix_name} 第 {i} 集视频ID为空,跳过') - video_details_list.append({ - 'episode_number': i, - 'video_id': '', - 'likes': 0, - 'shares': 0, - 'favorites': 0, - 'comments': [], - 'success': False, - 'error': '视频ID为空' - }) - continue - - logging.info(f'获取合集 {mix_name} 第 {i}/{len(episode_video_ids)} 集视频详细数据: {video_id}') - - try: - # 获取单个视频的详细数据 - video_details = self.get_video_details(video_id, max_comments_per_video) - video_details['episode_number'] = i - video_details_list.append(video_details) - - # 添加延迟避免请求过快 - time.sleep(2) - # exit(0) - - except Exception as e: - error_msg = f'获取视频 {video_id} 详细数据时出错: {e}' - logging.error(error_msg) - video_details_list.append({ - 'episode_number': i, - 'video_id': video_id, - 'likes': 0, - 'shares': 0, - 'favorites': 0, - 'comments': [], - 'success': False, - 'error': error_msg - }) - - # 统计获取结果 - success_count = sum(1 for detail in video_details_list if detail.get('success', False)) - total_likes = sum(detail.get('likes', 0) for detail in video_details_list) - total_comments = sum(len(detail.get('comments', [])) for detail in video_details_list) - - logging.info(f'合集 {mix_name} 视频详细数据获取完成: {success_count}/{len(episode_video_ids)} 成功, 总点赞数={total_likes:,}, 总评论数={total_comments}') - - return video_details_list - def get_cookies_dict(self): """获取当前页面的cookies""" if not hasattr(self, 'cookies') or not self.cookies: