From bba47d2fe95f318d35a986a17aff2bdbc97ed5e5 Mon Sep 17 00:00:00 2001 From: qiaoyirui0819 <3160533978@qq.com> Date: Sat, 25 Oct 2025 19:41:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E9=94=99=E8=AF=AF=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handlers/Rankings/rank_data_scraper.py | 1131 ++++++++++++++++- 1 file changed, 1130 insertions(+), 1 deletion(-) diff --git a/backend/handlers/Rankings/rank_data_scraper.py b/backend/handlers/Rankings/rank_data_scraper.py index c0a6cd2..35b65f0 100644 --- a/backend/handlers/Rankings/rank_data_scraper.py +++ b/backend/handlers/Rankings/rank_data_scraper.py @@ -89,6 +89,9 @@ class DouyinPlayVVScraper: # 使用 database.py 中的连接 self.db = db + # 根据运行模式选择集合 + is_timer_mode = os.environ.get('TIMER_MODE') == '1' + mongo_collection = 'Ranking_storage_list' if is_timer_mode else 'Rankings_list' # 根据运行模式选择集合 is_timer_mode = os.environ.get('TIMER_MODE') == '1' mongo_collection = 'Ranking_storage_list' if is_timer_mode else 'Rankings_list' @@ -96,6 +99,7 @@ class DouyinPlayVVScraper: logging.info(f'MongoDB连接成功,使用数据库: {self.db.name},集合: {mongo_collection}') logging.info(f'当前运行模式: {"定时器模式" if is_timer_mode else "普通模式"}') + logging.info(f'当前运行模式: {"定时器模式" if is_timer_mode else "普通模式"}') except Exception as e: logging.error(f'MongoDB连接失败: {e}') @@ -453,8 +457,62 @@ class DouyinPlayVVScraper: if n >= 10_000: return f"{n/10_000:.1f}万" return str(n) + + def format_interaction_count(self, n: int) -> str: + """格式化互动数据数量,返回带单位的字符串 + Args: + n: 数量 + Returns: + str: 格式化后的字符串,如 27898 -> 2.8W, 1234 -> 1234 + """ + if n >= 100_000_000: + result = n / 100_000_000 + if result == int(result): + return f"{int(result)}亿" + else: + return f"{result:.1f}亿" + elif n >= 10_000: + result = n / 10_000 + if result == int(result): + return f"{int(result)}W" + else: + return f"{result:.1f}W" + else: + return str(n) - + def save_comments_to_file(self, comments: list, video_id: str = None): + """简单保存评论数据到JSON文件""" + try: + if not comments: + return None + + # 创建保存目录 + script_dir = os.path.dirname(os.path.abspath(__file__)) + save_dir = os.path.join(script_dir, 'saved_comments') + os.makedirs(save_dir, exist_ok=True) + + # 生成文件名 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + filename = f'comments_{video_id}_{timestamp}.json' if video_id else f'comments_{timestamp}.json' + file_path = os.path.join(save_dir, filename) + + # 保存数据 + save_data = { + 'timestamp': datetime.now().isoformat(), + 'video_id': video_id, + 'total_comments': len(comments), + 'comments': comments + } + + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(save_data, f, ensure_ascii=False, indent=2) + + logging.info(f'保存 {len(comments)} 条评论到: {file_path}') + return file_path + + except Exception as e: + logging.error(f'保存评论失败: {e}') + return None def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None): """解析文本中的play_vv、mix_name和watched_item信息""" @@ -1175,6 +1233,7 @@ class DouyinPlayVVScraper: 'Network.responseReceived' in log['method'] and 'response' in log['params'] and log['params']['response'] + and log['params']['response'] and 'url' in log['params']['response'] and '/web/api/v2/aweme/iteminfo' in log['params']['response']['url'] ): @@ -1215,6 +1274,11 @@ class DouyinPlayVVScraper: logging.info(f'定时器模式:跳过 get_collection_videos 函数') return [] + # 定时器模式下跳过此函数 + if os.environ.get('TIMER_MODE') == '1': + logging.info(f'定时器模式:跳过 get_collection_videos 函数') + return [] + try: # 检查缓存文件 cache_dir = os.path.join(os.path.dirname(__file__), 'episode_video_ids') @@ -1358,6 +1422,1071 @@ class DouyinPlayVVScraper: return [video['video_id'] for video in cached_videos] return [] + def _simulate_comment_scrolling(self, video_id: str, max_scroll_attempts: int = 10, scroll_delay: float = 2.0) -> list: + """ + 模拟用户异步滑动机制,向上滑动加载更多评论 + Args: + video_id: 视频ID + max_scroll_attempts: 最大滑动尝试次数,默认10次 + scroll_delay: 每次滑动后的延迟时间(秒),默认2秒 + Returns: + list: 收集到的所有评论数据 + """ + all_comments = [] + collected_comment_ids = set() + + try: + logging.info(f'开始为视频 {video_id} 执行评论滑动加载机制') + + # 等待页面加载完成 + time.sleep(3) + + # 定位评论区域 + self._scroll_to_comment_section() + + # 点击评论区域以触发网络请求 + self._click_comment_area() + + # 使用线程池实现异步滑动和监控 + from concurrent.futures import ThreadPoolExecutor + import threading + + # 创建共享状态对象,用于任务间通信 + shared_state = { + 'scroll_completed': False, + 'lock': threading.Lock() + } + + with ThreadPoolExecutor(max_workers=2) as executor: + # 提交滑动任务 + scroll_future = executor.submit(self._async_scroll_task_with_state, max_scroll_attempts, scroll_delay, shared_state) + + # 同时提交监控任务 - 监控任务会检测滑动任务状态 + monitor_future = executor.submit(self._async_monitor_task_with_state, video_id, collected_comment_ids, shared_state, 3600) + + # 等待两个任务完成 + scroll_result = scroll_future.result() + monitor_comments = monitor_future.result() + + all_comments.extend(monitor_comments) + + logging.info(f'评论滑动加载完成,共收集到 {len(all_comments)} 条评论') + + # 保存评论到文件 + if all_comments: + self.save_comments_to_file(all_comments, video_id) + + return all_comments + + except Exception as e: + logging.error(f'评论滑动加载机制执行失败: {e}') + return all_comments + + + + def _async_scroll_task_with_state(self, max_attempts: int, scroll_delay: float, shared_state: dict): + """带状态的异步滑动任务 - 无限滑动直到检测到"暂时没有更多评论"文本""" + try: + consecutive_no_progress = 0 # 连续无进展次数 + attempt = 0 + + logging.info('开始无限滑动,直到检测到"暂时没有更多评论"') + + while True: # 无限循环,直到检测到底部文本 + attempt += 1 + logging.info(f'第 {attempt} 次向上滑动') + + # 记录滑动前的位置 + current_position = self.driver.execute_script("return window.pageYOffset;") + + # 执行向上滑动(加载更多评论) + self._execute_upward_scroll(attempt) + + # 等待新内容加载 + time.sleep(scroll_delay) + + # 优先检查是否到达底部(检测到"暂时没有更多评论"文本) + if self._check_comment_section_bottom(): + logging.info('检测到"暂时没有更多评论",停止滑动') + break + + # 检查滑动是否有效果 + new_position = self.driver.execute_script("return window.pageYOffset;") + if abs(new_position - current_position) < 50: # 滑动距离太小 + consecutive_no_progress += 1 + logging.debug(f'滑动进展较小,连续无进展次数: {consecutive_no_progress}') + + # 如果连续多次无进展,增加滑动力度 + if consecutive_no_progress >= 5: + logging.info('连续多次滑动无进展,增加滑动力度') + self._execute_force_scroll() + consecutive_no_progress = 0 # 重置计数器 + time.sleep(scroll_delay * 2) # 增加等待时间 + + # 再次检查是否到达底部 + if self._check_comment_section_bottom(): + logging.info('强制滑动后检测到底部,停止滑动') + break + else: + consecutive_no_progress = 0 + + # 每50次滑动输出一次进度信息 + if attempt % 50 == 0: + logging.info(f'已完成 {attempt} 次滑动,继续寻找"暂时没有更多评论"文本') + + # 安全机制:如果滑动次数过多,暂停一下 + if attempt % 200 == 0: + logging.info(f'已滑动 {attempt} 次,暂停5秒以避免过度请求') + time.sleep(5) + + # 滑动任务完成,通知监控任务 + with shared_state['lock']: + shared_state['scroll_completed'] = True + logging.info('滑动任务已完成,通知监控任务结束') + + except Exception as e: + logging.warning(f'滑动任务出错: {e}') + # 即使出错也要通知监控任务结束 + with shared_state['lock']: + shared_state['scroll_completed'] = True + + def _execute_force_scroll(self): + """执行强制滑动,用于突破可能的滑动阻塞""" + try: + logging.info('执行强制滑动以突破阻塞') + + # 执行多重强制滑动策略 + self.driver.execute_script(""" + // 1. 多次大幅度滑动 + for (let i = 0; i < 5; i++) { + window.scrollBy(0, 1000); + document.documentElement.scrollTop += 1000; + document.body.scrollTop += 1000; + } + + // 2. 滑动到页面最底部 + window.scrollTo(0, document.body.scrollHeight); + + // 3. 强制滚动所有容器 + const containers = document.querySelectorAll('[data-e2e="comment-list"], .comment-list, [class*="comment"], [class*="scroll"]'); + containers.forEach(container => { + if (container.scrollTop !== undefined) { + container.scrollTop = container.scrollHeight; + container.dispatchEvent(new Event('scroll', { bubbles: true })); + } + }); + + // 4. 触发所有滚动相关事件 + ['scroll', 'wheel', 'touchmove', 'resize', 'load'].forEach(eventType => { + window.dispatchEvent(new Event(eventType, { bubbles: true })); + document.dispatchEvent(new Event(eventType, { bubbles: true })); + }); + + // 5. 模拟用户交互 + document.body.click(); + + console.log('执行强制滑动完成'); + """) + + time.sleep(3) # 增加等待时间 + + # 再次滑动到底部确保效果 + self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + time.sleep(1) + + logging.debug('强制滑动操作完成') + + except Exception as e: + logging.warning(f'执行强制滑动失败: {e}') + + def _execute_upward_scroll(self, attempt: int): + """执行向上滑动操作 - 使用强力滑动策略确保有效触发懒加载""" + try: + # 记录滑动前状态 + before_state = self.driver.execute_script(""" + return { + scrollTop: window.pageYOffset, + commentCount: document.querySelectorAll('[data-e2e="comment-item"], [class*="comment"], .comment-item').length, + pageHeight: document.documentElement.scrollHeight + }; + """) + + logging.debug(f'滑动前状态: 位置={before_state["scrollTop"]}px, 评论数={before_state["commentCount"]}条') + + # 计算滑动距离,递增以确保效果 + scroll_distance = 800 + (attempt * 300) + + # 执行强力滚动 - 参考111.py的实现 + self.driver.execute_script(f""" + // 1. 强制滚动页面 + window.scrollBy(0, {scroll_distance}); + document.documentElement.scrollTop += {scroll_distance}; + document.body.scrollTop += {scroll_distance}; + + // 2. 滚动到页面底部(触发懒加载) + window.scrollTo(0, document.body.scrollHeight); + + // 3. 查找并滚动所有可能的评论容器 + const containers = document.querySelectorAll('[data-e2e="comment-list"], .comment-list, [class*="comment"], [class*="scroll"], [role="main"]'); + containers.forEach(container => {{ + if (container.scrollTop !== undefined) {{ + container.scrollTop = container.scrollHeight; + container.dispatchEvent(new Event('scroll', {{ bubbles: true }})); + }} + }}); + + // 4. 触发所有相关事件 + ['scroll', 'wheel', 'touchmove', 'resize'].forEach(eventType => {{ + window.dispatchEvent(new Event(eventType, {{ bubbles: true }})); + document.dispatchEvent(new Event(eventType, {{ bubbles: true }})); + }}); + + // 5. 模拟用户交互 + document.body.click(); + + console.log('执行强力滚动:', {scroll_distance}, 'px'); + """) + + time.sleep(2) # 等待页面响应 + + # 尝试点击加载更多按钮(如果存在) + try: + button_clicked = self.driver.execute_script(""" + const selectors = [ + '[data-e2e="comment-load-more"]', + '[class*="load-more"]', + '[class*="more-comment"]', + 'button[class*="load"]', + 'div[class*="load"]' + ]; + + for (let selector of selectors) { + const buttons = document.querySelectorAll(selector); + for (let button of buttons) { + if (button.offsetParent !== null && !button.disabled) { + button.click(); + console.log('点击了加载更多按钮:', selector); + return true; + } + } + } + return false; + """) + + if button_clicked: + logging.debug('成功点击了加载更多按钮') + time.sleep(1) # 等待按钮响应 + + except Exception as e: + logging.debug(f'点击加载更多按钮失败: {e}') + + # 每隔几次使用真实手势滑动 + if attempt % 3 == 0: + self._simulate_real_swipe() + + logging.debug(f'执行强力滑动,距离: {scroll_distance}px') + + except Exception as e: + logging.warning(f'执行滑动操作失败: {e}') + + def _simulate_real_swipe(self): + """模拟真实向上滑动手势 - 手指从下往上移动""" + try: + + window_size = self.driver.get_window_size() + width = window_size['width'] + height = window_size['height'] + + # 向上滑动手势:手指从屏幕下方往上方移动 + start_x = width // 2 + random.randint(-20, 20) # 增加随机性 + start_y = height * 4 // 5 # 从更靠下的位置开始(4/5处) + end_y = height // 5 # 到更靠上的位置结束(1/5处) + + # 使用ActionChains模拟真实向上滑动手势 + actions = ActionChains(self.driver) + actions.w3c_actions.pointer_action\ + .move_to_location(start_x, start_y)\ + .pointer_down()\ + .pause(0.1)\ + .move_to_location(start_x, end_y)\ + .pause(0.1)\ + .pointer_up() + actions.perform() + + logging.debug(f'执行真实向上滑动手势: 从({start_x}, {start_y})到({start_x}, {end_y})') + + except Exception as e: + logging.debug(f'真实手势滑动失败: {e}') + + def _async_monitor_task(self, video_id: str, collected_comment_ids: set, timeout: float) -> list: + """异步监控任务""" + all_comments = [] + start_time = time.time() + + while time.time() - start_time < timeout: + try: + # 从网络日志获取新评论 + new_comments = self._extract_comments_from_network_logs(video_id) + + # 去重并添加新评论 + for comment in new_comments: + comment_id = f"{comment.get('text', '')}_{comment.get('user_name', '')}" + if comment_id not in collected_comment_ids: + collected_comment_ids.add(comment_id) + all_comments.append(comment) + + if new_comments: + logging.info(f'监控到 {len(new_comments)} 条新评论,总计 {len(all_comments)} 条') + + # 短暂等待后继续监控 + time.sleep(1) + + except Exception as e: + logging.warning(f'监控任务出错: {e}') + time.sleep(2) + + return all_comments + + def _async_monitor_task_with_state(self, video_id: str, collected_comment_ids: set, shared_state: dict, timeout: float) -> list: + """带状态的异步监控任务 - 监控评论并检测滑动任务状态""" + all_comments = [] + start_time = time.time() + last_comment_count = 0 + no_new_comments_count = 0 + + logging.info('开始监控评论,将持续到滑动任务完成') + + while time.time() - start_time < timeout: + try: + # 检查滑动任务是否完成 + with shared_state['lock']: + scroll_completed = shared_state['scroll_completed'] + + if scroll_completed: + logging.info('检测到滑动任务已完成,监控任务即将结束') + # 滑动完成后再监控5秒,确保收集到最后的评论 + final_start = time.time() + while time.time() - final_start < 5: + try: + new_comments = self._extract_comments_from_network_logs(video_id) + for comment in new_comments: + comment_id = f"{comment.get('text', '')}_{comment.get('user_name', '')}" + if comment_id not in collected_comment_ids: + collected_comment_ids.add(comment_id) + all_comments.append(comment) + time.sleep(0.5) + except Exception as e: + logging.warning(f'最终监控阶段出错: {e}') + break + + # 从网络日志获取新评论 + new_comments = self._extract_comments_from_network_logs(video_id) + + # 去重并添加新评论 + for comment in new_comments: + comment_id = f"{comment.get('text', '')}_{comment.get('user_name', '')}" + if comment_id not in collected_comment_ids: + collected_comment_ids.add(comment_id) + all_comments.append(comment) + + # 检查是否有新评论 + current_comment_count = len(all_comments) + if current_comment_count > last_comment_count: + logging.info(f'监控到 {current_comment_count - last_comment_count} 条新评论,总计 {current_comment_count} 条') + last_comment_count = current_comment_count + no_new_comments_count = 0 + else: + no_new_comments_count += 1 + # 每30秒输出一次状态 + if no_new_comments_count % 30 == 0: + logging.info(f'监控中...当前总计 {current_comment_count} 条评论,等待滑动任务完成') + + # 短暂等待后继续监控 + time.sleep(1) + + except Exception as e: + logging.warning(f'监控任务出错: {e}') + time.sleep(2) + + logging.info(f'监控任务结束,共收集到 {len(all_comments)} 条评论') + return all_comments + + def _scroll_to_comment_section(self): + """滚动到评论区域""" + try: + comment_section_selectors = [ + '[data-e2e="comment-list"]', + '[class*="comment-list"]', + '[class*="comment-container"]', + ] + + for selector in comment_section_selectors: + try: + elements = self.driver.find_elements("css selector", selector) + if elements: + self.driver.execute_script( + "arguments[0].scrollIntoView({behavior: 'smooth', block: 'center'});", + elements[0] + ) + time.sleep(2) + logging.info(f'成功定位到评论区域: {selector}') + return + except Exception: + continue + + # 备用方案:滚动到页面底部 + self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + time.sleep(2) + logging.info('使用备用方案:滚动到页面底部') + + except Exception as e: + logging.warning(f'定位评论区域失败: {e}') + + def _click_comment_area(self): + """ + 点击评论区域以触发网络请求,确保能够获取到评论数据 + """ + try: + # 多种评论区域选择器 + comment_selectors = [ + '[data-e2e="comment-list"]', + '[class*="comment"]', + '[class*="Comment"]', + '.comment-list', + '.comment-container', + '[data-e2e="comment-item"]', + '[class*="comment-item"]', + 'div[class*="comment"]', + # 抖音特有的评论区域选择器 + 'div[data-e2e="comment-list"]', + 'div[class*="CommentList"]', + 'div[class*="comment-list"]' + ] + + clicked = False + for selector in comment_selectors: + try: + elements = self.driver.find_elements("css selector", selector) + if elements: + for element in elements: + try: + if element.is_displayed() and element.is_enabled(): + # 滚动到元素可见 + self.driver.execute_script("arguments[0].scrollIntoView(true);", element) + time.sleep(0.5) + + # 点击元素 + element.click() + logging.info(f'成功点击评论区域: {selector}') + clicked = True + time.sleep(1) # 等待网络请求触发 + break + except Exception as e: + logging.debug(f'点击元素失败: {e}') + continue + if clicked: + break + except Exception as e: + logging.debug(f'使用选择器 {selector} 查找评论区域失败: {e}') + continue + + if not clicked: + # 如果没有找到特定的评论区域,尝试点击页面中部区域 + try: + window_size = self.driver.get_window_size() + center_x = window_size['width'] // 2 + center_y = window_size['height'] // 2 + + # 使用JavaScript点击页面中部 + self.driver.execute_script(f""" + var element = document.elementFromPoint({center_x}, {center_y}); + if (element) {{ + element.click(); + }} + """) + logging.info('点击页面中部区域以触发评论加载') + time.sleep(1) + except Exception as e: + logging.debug(f'点击页面中部失败: {e}') + + except Exception as e: + logging.warning(f'点击评论区域失败: {e}') + + def _check_comment_section_bottom(self) -> bool: + """ + 检测是否已经到达评论区底部 + 只有检测到"暂时没有更多评论"文本时才停止滑动,确保无限滑动直到真正到达底部 + Returns: + bool: True表示已到达底部,False表示还可以继续加载 + """ + try: + # 目标文本:只有检测到这个文本才认为到达底部 + target_text = "暂时没有更多评论" + + logging.debug(f'正在检测评论区底部标识文本: "{target_text}"') + + # 方法1: 使用XPath检测包含文本的元素 + xpath_selectors = [ + f"//*[contains(text(), '{target_text}')]", + f"//div[contains(text(), '{target_text}')]", + f"//span[contains(text(), '{target_text}')]", + f"//p[contains(text(), '{target_text}')]", + f"//*[text()='{target_text}']" + ] + + for xpath in xpath_selectors: + try: + elements = self.driver.find_elements("xpath", xpath) + if elements: + # 检查元素是否可见 + for element in elements: + try: + if element.is_displayed(): + logging.info(f'检测到评论区底部标识文本: "{target_text}" (通过XPath: {xpath})') + return True + except Exception: + continue + except Exception as e: + logging.debug(f'XPath检测失败 {xpath}: {e}') + continue + + # 方法2: 使用JavaScript在页面中搜索文本 + try: + js_result = self.driver.execute_script(f""" + // 搜索页面中所有包含目标文本的元素 + var targetText = '{target_text}'; + var walker = document.createTreeWalker( + document.body, + NodeFilter.SHOW_TEXT, + null, + false + ); + + var node; + while (node = walker.nextNode()) {{ + if (node.textContent.includes(targetText)) {{ + var element = node.parentElement; + if (element && element.offsetParent !== null) {{ + return {{ + found: true, + text: node.textContent.trim(), + tagName: element.tagName, + className: element.className + }}; + }} + }} + }} + return {{found: false}}; + """) + + if js_result and js_result.get('found'): + logging.info(f'通过JavaScript检测到评论区底部标识文本: "{target_text}"') + logging.debug(f'元素信息: 标签={js_result.get("tagName")}, 类名={js_result.get("className")}, 文本="{js_result.get("text")}"') + return True + + except Exception as e: + logging.debug(f'JavaScript文本检测失败: {e}') + + # 方法3: 检查页面源码中是否包含完整的目标文本 + try: + page_source = self.driver.page_source + if target_text in page_source: + # 进一步验证:使用正则表达式确保是完整的文本匹配 + pattern = re.escape(target_text) + if re.search(pattern, page_source): + logging.info(f'在页面源码中检测到完整的底部标识文本: "{target_text}"') + return True + + except Exception as e: + logging.debug(f'页面源码检测失败: {e}') + + # 检查页面滚动位置(仅用于调试信息) + try: + current_position = self.driver.execute_script("return window.pageYOffset;") + page_height = self.driver.execute_script("return document.body.scrollHeight;") + window_height = self.driver.execute_script("return window.innerHeight;") + distance_to_bottom = page_height - (current_position + window_height) + + logging.debug(f'滚动状态: 当前位置={current_position}, 页面高度={page_height}, 窗口高度={window_height}, 距离底部={distance_to_bottom}px') + + # 即使滚动到底部,也不停止滑动,除非检测到目标文本 + if distance_to_bottom <= 10: + logging.debug(f'已滚动到页面底部,但未检测到"{target_text}"文本,继续滑动') + + except Exception as e: + logging.debug(f'检查滚动位置失败: {e}') + + # 只有检测到"暂时没有更多评论"文本才返回True,否则继续滑动 + logging.debug(f'未检测到"{target_text}"文本,继续滑动') + return False + + except Exception as e: + logging.warning(f'检测评论区底部失败: {e}') + return False + + def _extract_comments_from_network_logs(self, video_id: str) -> list: + """ + 从网络日志中提取评论数据 + Args: + video_id: 视频ID + Returns: + list: 评论数据列表 + """ + comments = [] + try: + # 获取网络请求日志 + logs = self.driver.get_log('performance') + + for entry in logs: + try: + log = json.loads(entry['message'])['message'] + if ( + 'Network.responseReceived' in log['method'] + and 'response' in log['params'] + and log['params']['response'] + and log['params']['response'].get('url') + ): + url = log['params']['response']['url'] + + # 检查是否是评论API + if '/aweme/v1/web/comment/list/' in url and video_id in url: + try: + # 获取响应体 + response_body = self.driver.execute_cdp_cmd( + 'Network.getResponseBody', + {'requestId': log['params']['requestId']} + ) + + if response_body and 'body' in response_body: + data = json.loads(response_body['body']) + api_comments = data.get('comments', []) + + for comment in api_comments: + comment_info = { + 'text': comment.get('text', ''), + 'user_name': comment.get('user', {}).get('nickname', ''), + 'digg_count': int(comment.get('digg_count', 0)), + 'create_time': comment.get('create_time', 0) + } + comments.append(comment_info) + + # 记录API URL信息,用于调试 + if api_comments: + logging.debug(f'从API获取到 {len(api_comments)} 条评论: {url}') + + except Exception as e: + logging.debug(f'解析评论API响应失败: {e}') + continue + + except Exception as e: + continue + + except Exception as e: + logging.warning(f'提取网络日志评论数据失败: {e}') + + return comments + + def get_video_details(self, video_id: str, max_comments: int = 100) -> dict: + """获取单个视频的详细互动数据 + Args: + video_id: 视频ID + max_comments: 最大评论数量,默认100条 + Returns: + dict: 包含点赞数、收藏数、转发数、评论内容的字典 + """ + video_details = { + 'video_id': video_id, + 'likes': 0, + 'shares': 0, + 'favorites': 0, + 'likes_formatted': '0', + 'shares_formatted': '0', + 'favorites_formatted': '0', + 'comments': [], + 'success': False, + 'error': None + } + + try: + # 确保driver已初始化 + if self.driver is None: + logging.info('Driver未初始化,正在设置...') + self.setup_driver() + if self.driver is None: + raise Exception("无法初始化WebDriver") + + video_url = f'https://www.douyin.com/video/{video_id}' + logging.info(f'获取视频详细数据: {video_url}') + + # 导航到视频页面 + self.driver.get(video_url) + time.sleep(3) + + # 等待页面加载完成 + try: + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support import expected_conditions as EC + from selenium.webdriver.common.by import By + + WebDriverWait(self.driver, 10).until( + EC.presence_of_element_located((By.TAG_NAME, "video")) + ) + except Exception as e: + logging.warning(f'等待视频元素超时: {e}') + + # 首先获取页面加载时的网络请求日志(关键修复) + logging.info(f'获取页面加载时的网络日志以捕获视频详情API') + initial_logs = self.driver.get_log('performance') + + # 解析初始网络日志获取视频详细数据cc + for entry in initial_logs: + try: + log = json.loads(entry['message'])['message'] + if ( + 'Network.responseReceived' in log['method'] + and 'response' in log['params'] + and log['params']['response'] + and log['params']['response'].get('url') + ): + url = log['params']['response']['url'] + + # 检查是否是视频详情API + if '/aweme/v1/web/aweme/detail/' in url and video_id in url: + try: + # 获取响应体 + response_body = self.driver.execute_cdp_cmd( + 'Network.getResponseBody', + {'requestId': log['params']['requestId']} + ) + + if response_body and 'body' in response_body: + data = json.loads(response_body['body']) + aweme_detail = data.get('aweme_detail', {}) + + if aweme_detail: + # 获取统计数据 + statistics = aweme_detail.get('statistics', {}) + video_details['likes'] = int(statistics.get('digg_count', 0)) + video_details['shares'] = int(statistics.get('share_count', 0)) + video_details['favorites'] = int(statistics.get('collect_count', 0)) + + # 添加格式化字段 + video_details['likes_formatted'] = self.format_interaction_count(video_details['likes']) + video_details['shares_formatted'] = self.format_interaction_count(video_details['shares']) + video_details['favorites_formatted'] = self.format_interaction_count(video_details['favorites']) + + logging.info(f'从初始网络日志获取视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') + break + + except Exception as e: + logging.warning(f'解析初始视频详情API响应失败: {e}') + continue + + except Exception as e: + continue + + # 启动滑动机制加载更多评论 + logging.info(f'开始为视频 {video_id} 启动滑动机制加载评论') + scrolled_comments = self._simulate_comment_scrolling(video_id, max_scroll_attempts=15, scroll_delay=2.0) + + # 如果滑动机制获取到评论,直接使用 + if scrolled_comments: + video_details['comments'] = scrolled_comments[:max_comments] + logging.info(f'滑动机制成功获取 {len(video_details["comments"])} 条评论') + + # 获取滑动后的网络请求日志(用于评论数据) + logs = self.driver.get_log('performance') + + # 解析滑动后的网络日志获取评论数据(作为滑动机制的补充) + for entry in logs: + try: + log = json.loads(entry['message'])['message'] + if ( + 'Network.responseReceived' in log['method'] + and 'response' in log['params'] + and log['params']['response'] + and log['params']['response'].get('url') + ): + url = log['params']['response']['url'] + + # 只处理评论API(视频详情API已在初始阶段处理) + if '/aweme/v1/web/comment/list/' in url and video_id in url and not video_details['comments']: + try: + # 获取响应体 + response_body = self.driver.execute_cdp_cmd( + 'Network.getResponseBody', + {'requestId': log['params']['requestId']} + ) + + if response_body and 'body' in response_body: + data = json.loads(response_body['body']) + comments = data.get('comments', []) + + # 只有在滑动机制没有获取到评论时才使用这个方法 + if not video_details['comments']: + for comment in comments[:max_comments]: + comment_info = { + 'text': comment.get('text', ''), + 'user_name': comment.get('user', {}).get('nickname', ''), + 'digg_count': int(comment.get('digg_count', 0)), + 'create_time': comment.get('create_time', 0) + } + video_details['comments'].append(comment_info) + + logging.info(f'备用方案获取到 {len(comments)} 条评论') + logging.info(f'评论API URL: {url}') + + except Exception as e: + logging.warning(f'解析评论API响应失败: {e}') + continue + + except Exception as e: + continue + + # 如果网络日志没有获取到数据,尝试页面解析 + if video_details['likes'] == 0 and video_details['shares'] == 0 and video_details['favorites'] == 0: + video_details = self._parse_video_details_from_page(video_id, video_details, max_comments) + + video_details['success'] = True + return video_details + + except Exception as e: + error_msg = f'获取视频 {video_id} 详细数据失败: {e}' + logging.error(error_msg) + video_details['error'] = error_msg + return video_details + + def _parse_video_details_from_page(self, video_id: str, video_details: dict, max_comments: int = 20) -> dict: + """从页面元素解析视频详细数据(备用方案) + Args: + video_id: 视频ID + video_details: 现有的视频详细数据字典 + max_comments: 最大评论数量 + Returns: + dict: 更新后的视频详细数据字典 + """ + try: + logging.info(f'尝试从页面元素解析视频 {video_id} 的详细数据') + + # 尝试解析页面中的SSR数据 + try: + # 查找包含视频数据的script标签 + scripts = self.driver.find_elements("tag name", "script") + for script in scripts: + script_content = script.get_attribute('innerHTML') + if script_content and ('window._SSR_HYDRATED_DATA' in script_content or 'RENDER_DATA' in script_content): + # 提取JSON数据 + if 'window._SSR_HYDRATED_DATA' in script_content: + match = re.search(r'window\._SSR_HYDRATED_DATA\s*=\s*({.*?});', script_content, re.DOTALL) + else: + match = re.search(r'window\.RENDER_DATA\s*=\s*({.*?});', script_content, re.DOTALL) + + if match: + data = json.loads(match.group(1)) + + # 查找视频详情数据 + def find_video_data(obj, target_id): + if isinstance(obj, dict): + for key, value in obj.items(): + if key == 'aweme_id' and str(value) == str(target_id): + return obj + elif isinstance(value, (dict, list)): + result = find_video_data(value, target_id) + if result: + return result + elif isinstance(obj, list): + for item in obj: + result = find_video_data(item, target_id) + if result: + return result + return None + + video_data = find_video_data(data, video_id) + if video_data: + statistics = video_data.get('statistics', {}) + video_details['likes'] = int(statistics.get('digg_count', 0)) + video_details['shares'] = int(statistics.get('share_count', 0)) + video_details['favorites'] = int(statistics.get('collect_count', 0)) + + # 添加格式化字段 + video_details['likes_formatted'] = self.format_interaction_count(video_details['likes']) + video_details['shares_formatted'] = self.format_interaction_count(video_details['shares']) + video_details['favorites_formatted'] = self.format_interaction_count(video_details['favorites']) + + logging.info(f'从SSR数据解析到视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') + break + + except Exception as e: + logging.warning(f'解析SSR数据失败: {e}') + + # 如果SSR数据解析失败,尝试CSS选择器 + if video_details['likes'] == 0 and video_details['shares'] == 0 and video_details['favorites'] == 0: + try: + # 尝试常见的点赞、分享、收藏按钮选择器 + selectors = { + 'likes': [ + '[data-e2e="video-like-count"]', + '[class*="like"] [class*="count"]', + '[class*="digg"] [class*="count"]' + ], + 'shares': [ + '[data-e2e="video-share-count"]', + '[class*="share"] [class*="count"]' + ], + 'favorites': [ + '[data-e2e="video-collect-count"]', + '[class*="collect"] [class*="count"]', + '[class*="favorite"] [class*="count"]' + ] + } + + for data_type, selector_list in selectors.items(): + for selector in selector_list: + try: + elements = self.driver.find_elements("css selector", selector) + if elements: + text = elements[0].text.strip() + if text and text.replace('.', '').replace('万', '').replace('亿', '').isdigit(): + # 转换数字格式 + if '亿' in text: + video_details[data_type] = int(float(text.replace('亿', '')) * 100000000) + elif '万' in text: + video_details[data_type] = int(float(text.replace('万', '')) * 10000) + else: + video_details[data_type] = int(text) + break + except Exception: + continue + + if video_details['likes'] > 0 or video_details['shares'] > 0 or video_details['favorites'] > 0: + # 添加格式化字段 + video_details['likes_formatted'] = self.format_interaction_count(video_details['likes']) + video_details['shares_formatted'] = self.format_interaction_count(video_details['shares']) + video_details['favorites_formatted'] = self.format_interaction_count(video_details['favorites']) + + logging.info(f'从页面元素解析到视频 {video_id} 互动数据: 点赞={video_details["likes_formatted"]}, 分享={video_details["shares_formatted"]}, 收藏={video_details["favorites_formatted"]}') + + except Exception as e: + logging.warning(f'CSS选择器解析失败: {e}') + + # 尝试获取评论(如果还没有获取到) + if not video_details['comments']: + try: + # 滚动到评论区域 + self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") + time.sleep(2) + + # 尝试常见的评论选择器 + comment_selectors = [ + '[data-e2e="comment-item"]', + '[class*="comment-item"]', + '[class*="comment"] [class*="content"]' + ] + + for selector in comment_selectors: + try: + comment_elements = self.driver.find_elements("css selector", selector)[:max_comments] + if comment_elements: + for element in comment_elements: + try: + comment_text = element.text.strip() + if comment_text: + comment_info = { + 'text': comment_text, + 'user_name': '', + 'digg_count': 0, + 'create_time': 0 + } + video_details['comments'].append(comment_info) + except Exception: + continue + + if video_details['comments']: + logging.info(f'从页面元素获取到视频 {video_id} 的 {len(video_details["comments"])} 条评论') + break + except Exception: + continue + + except Exception as e: + logging.warning(f'获取评论失败: {e}') + + except Exception as e: + logging.warning(f'页面解析视频详细数据失败: {e}') + + return video_details + + def get_collection_video_details(self, episode_video_ids: list, mix_name: str = '', max_comments_per_video: int = 100) -> list: + """获取合集中所有视频的详细互动数据 + Args: + episode_video_ids: 视频ID列表 + mix_name: 合集名称,用于日志 + max_comments_per_video: 每个视频最大评论数量,默认100条 + Returns: + list: 包含每个视频详细数据的列表 + """ + # 定时器模式下跳过此函数 + if os.environ.get('TIMER_MODE') == '1': + logging.info(f'定时器模式:跳过 get_collection_video_details 函数') + return [] + + if not episode_video_ids: + logging.info(f'合集 {mix_name} 没有视频ID,跳过详细数据获取') + return [] + + logging.info(f'开始获取合集 {mix_name} 中 {len(episode_video_ids)} 个视频的详细数据') + + video_details_list = [] + + for i, video_id in enumerate(episode_video_ids, 1): + if not video_id: + logging.warning(f'合集 {mix_name} 第 {i} 集视频ID为空,跳过') + video_details_list.append({ + 'episode_number': i, + 'video_id': '', + 'likes': 0, + 'shares': 0, + 'favorites': 0, + 'comments': [], + 'success': False, + 'error': '视频ID为空' + }) + continue + + logging.info(f'获取合集 {mix_name} 第 {i}/{len(episode_video_ids)} 集视频详细数据: {video_id}') + + try: + # 获取单个视频的详细数据 + video_details = self.get_video_details(video_id, max_comments_per_video) + video_details['episode_number'] = i + video_details_list.append(video_details) + + # 添加延迟避免请求过快 + time.sleep(2) + # exit(0) + + except Exception as e: + error_msg = f'获取视频 {video_id} 详细数据时出错: {e}' + logging.error(error_msg) + video_details_list.append({ + 'episode_number': i, + 'video_id': video_id, + 'likes': 0, + 'shares': 0, + 'favorites': 0, + 'comments': [], + 'success': False, + 'error': error_msg + }) + + # 统计获取结果 + success_count = sum(1 for detail in video_details_list if detail.get('success', False)) + total_likes = sum(detail.get('likes', 0) for detail in video_details_list) + total_comments = sum(len(detail.get('comments', [])) for detail in video_details_list) + + logging.info(f'合集 {mix_name} 视频详细数据获取完成: {success_count}/{len(episode_video_ids)} 成功, 总点赞数={total_likes:,}, 总评论数={total_comments}') + + return video_details_list + def get_cookies_dict(self): """获取当前页面的cookies""" if not hasattr(self, 'cookies') or not self.cookies: