diff --git a/backend/handlers/Rankings/rank_data_scraper.py b/backend/handlers/Rankings/rank_data_scraper.py index a682bb5..a71d3ef 100644 --- a/backend/handlers/Rankings/rank_data_scraper.py +++ b/backend/handlers/Rankings/rank_data_scraper.py @@ -28,9 +28,11 @@ import base64 import uuid import sys import psutil +from typing import Dict, List, Optional, Set import random import threading import argparse +from typing import Dict, List, Optional, Set from concurrent.futures import ThreadPoolExecutor # 使用线程池实现异步滑动和监控 from selenium import webdriver @@ -68,6 +70,633 @@ logging.basicConfig( ) +class UnifiedDataCollector: + """统一数据收集器 - 解决数据重复和抓取不全问题""" + + def __init__(self, driver, duration_s: int = 60): + self.driver = driver + self.duration_s = duration_s + + # 统一数据存储 - 按mix_id去重 + self.collected_items: Dict[str, dict] = {} + + # 数据源统计 + self.source_stats = { + 'network': 0, + 'ssr': 0, + 'page': 0, + 'filtered': 0 + } + + # 已知请求ID集合,用于去重 + self.known_request_ids: Set[str] = set() + + # 目标关键词(收藏/合集/视频) + self.url_keywords = ['aweme', 'mix', 'collection', 'favorite', 'note', 'api'] + + # 是否在网络收集过程中周期性触发滚动加载(默认关闭以避免浪费时间) + self.enable_network_scroll: bool = False + + logging.info('统一数据收集器初始化完成') + + def collect_all_data(self) -> List[dict]: + """统一的数据收集入口 - 整合所有数据源""" + logging.info('开始统一数据收集') + + # 重置统计 + self.source_stats = {'network': 0, 'ssr': 0, 'page': 0, 'filtered': 0} + + # 按优先级收集数据 + self._collect_from_network() + self._collect_from_ssr() + self._collect_from_page() + + # 输出统计信息 + self._log_collection_stats() + + return list(self.collected_items.values()) + + def _collect_from_network(self): + """从网络API监控收集数据""" + logging.info('开始网络API数据收集') + start_time = time.time() + last_scroll_time = start_time + + while time.time() - start_time < self.duration_s: + try: + logs = self.driver.get_log('performance') + except Exception as e: + logging.warning(f'获取性能日志失败: {e}') + time.sleep(1) + continue + + for entry in logs: + try: + message = json.loads(entry['message'])['message'] + method = message.get('method') + params = message.get('params', {}) + + # 响应到达,尝试获取响应体 + if method == 'Network.responseReceived': + req_id = params.get('requestId') + url = params.get('response', {}).get('url', '') + type_ = params.get('type') # XHR, Fetch, Document + + if req_id and req_id not in self.known_request_ids: + self.known_request_ids.add(req_id) + + # 仅处理XHR/Fetch + if type_ in ('XHR', 'Fetch') and any(k in url for k in self.url_keywords): + try: + body_obj = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': req_id}) + body_text = body_obj.get('body', '') + + # 可能是base64编码 + if body_obj.get('base64Encoded'): + try: + body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore') + except Exception: + pass + + # 解析数据 + self._parse_and_add_item(body_text, url, req_id, 'network') + + except Exception: + # 某些响应不可获取或过大 + pass + except Exception: + continue + + # 在收集过程中定期触发数据加载(默认关闭) + if self.enable_network_scroll: + current_time = time.time() + if current_time - last_scroll_time > 15: # 降低频率:每15秒 + # 若检测到底部则不再滚动 + if not self._check_no_more_content(): + self._trigger_mini_scroll() + last_scroll_time = current_time + + time.sleep(0.8) + + logging.info(f'网络API数据收集完成,发现 {self.source_stats["network"]} 个有效项') + + def _trigger_mini_scroll(self): + """在数据收集过程中触发滚动加载数据 - 增强版滚动机制""" + try: + logging.info('开始触发滚动加载数据...') + + # 方式1:强力滚动策略 - 模拟真实用户行为 + try: + # 强力滚动:多次大幅度滚动确保触发懒加载 + for i in range(5): + # 计算滚动距离,递增以确保效果 + scroll_distance = 800 + (i * 300) + + # 执行强力滚动 + self.driver.execute_script(f""" + // 1. 强制滚动页面 + window.scrollBy(0, {scroll_distance}); + document.documentElement.scrollTop += {scroll_distance}; + document.body.scrollTop += {scroll_distance}; + + // 2. 滚动到页面底部(触发懒加载) + window.scrollTo(0, document.body.scrollHeight); + + // 3. 查找并滚动所有可能的容器 + const containers = document.querySelectorAll('[data-e2e="comment-list"], .comment-list, [class*="comment"], [class*="scroll"], [role="main"]'); + containers.forEach(container => {{ + if (container.scrollTop !== undefined) {{ + container.scrollTop = container.scrollHeight; + container.dispatchEvent(new Event('scroll', {{ bubbles: true }})); + }} + }}); + + // 4. 触发所有相关事件 + ['scroll', 'wheel', 'touchmove', 'resize'].forEach(eventType => {{ + window.dispatchEvent(new Event(eventType, {{ bubbles: true }})); + document.dispatchEvent(new Event(eventType, {{ bubbles: true }})); + }}); + + // 5. 模拟用户交互 + document.body.click(); + + console.log('执行强力滚动:', {scroll_distance}, 'px'); + """) + + logging.info(f'第{i+1}次强力滚动,距离: {scroll_distance}px') + time.sleep(2) # 等待数据加载 + + # 检查是否有新数据加载 + current_height = self.driver.execute_script("return document.body.scrollHeight;") + logging.info(f'当前页面高度: {current_height}px') + + # 检查是否到达底部 + if self._check_no_more_content(): + logging.info('检测到页面底部,停止滚动') + break + + return + except Exception as e: + logging.debug(f'强力滚动失败: {e}') + + # 方式2:尝试滚动到特定元素 + try: + # 查找可能的加载更多按钮或元素 + load_more_selectors = [ + "[data-e2e='load-more']", + "[class*='load-more']", + "[class*='loadmore']", + "[class*='more']", + "button", + "[role='button']" + ] + + for selector in load_more_selectors: + try: + elements = self.driver.find_elements(By.CSS_SELECTOR, selector) + for element in elements: + if element.is_displayed(): + # 滚动到元素 + self.driver.execute_script("arguments[0].scrollIntoView();", element) + logging.info(f'滚动到元素: {selector}') + time.sleep(2) + # 尝试点击 + try: + element.click() + logging.info(f'点击加载更多按钮: {selector}') + time.sleep(3) + except: + pass + return + except: + continue + except Exception as e: + logging.debug(f'滚动到元素失败: {e}') + + # 方式3:渐进式滚动 + try: + current_position = self.driver.execute_script("return window.pageYOffset;") + page_height = self.driver.execute_script("return document.body.scrollHeight;") + window_height = self.driver.execute_script("return window.innerHeight;") + + logging.info(f'当前位置: {current_position}px, 页面高度: {page_height}px, 窗口高度: {window_height}px') + + # 如果页面高度很小,说明没有数据,需要触发加载 + if page_height < 2000: + # 多次滚动触发数据加载 + for i in range(5): + self.driver.execute_script(f"window.scrollTo(0, {500 * (i+1)});") + logging.info(f'渐进滚动 {i+1}: {500 * (i+1)}px') + time.sleep(2) + else: + # 正常滚动 + scroll_distance = min(1000, page_height - current_position - window_height) + if scroll_distance > 100: + new_position = current_position + scroll_distance + self.driver.execute_script(f'window.scrollTo(0, {new_position});') + logging.info(f'滚动到位置: {new_position}px') + time.sleep(2) + + return + except Exception as e: + logging.debug(f'渐进式滚动失败: {e}') + + # 方式4:检查是否已显示"暂时没有更多了" + if self._check_no_more_content(): + logging.info('已到达页面底部:暂时没有更多了') + return + + logging.info('滚动完成,等待数据加载...') + + except Exception as e: + logging.error(f'滚动触发失败: {e}') + + def _check_no_more_content(self) -> bool: + """检查是否已到达页面底部,没有更多内容""" + try: + # 检查多种可能的底部标识文本 + bottom_indicators = [ + "暂时没有更多了", + "没有更多内容", + "已加载全部", + "加载完毕" + ] + + for indicator in bottom_indicators: + try: + result = self.driver.execute_script(f""" + var elements = document.querySelectorAll('*'); + for (var i = 0; i < elements.length; i++) {{ + var text = elements[i].textContent || elements[i].innerText; + if (text.includes('{indicator}')) {{ + return true; + }} + }} + return false; + """) + if result: + logging.debug(f'检测到页面底部标识: "{indicator}"') + return True + except Exception: + continue + + return False + except Exception as e: + logging.debug(f'检查页面底部失败: {e}') + return False + + def _trigger_scroll_during_collection(self): + """在数据收集过程中触发数据加载 - 简化版,仅使用滚动""" + logging.info('在数据收集过程中触发滚动加载') + + try: + # 获取初始数据量 + initial_count = len(self.collected_items) + logging.info(f'滚动前数据量: {initial_count} 个短剧') + + # 仅使用强力滚动策略,不进行不必要的刷新和按钮点击 + self._trigger_mini_scroll() + + # 检查是否有新数据加载 + final_count = len(self.collected_items) + total_new = final_count - initial_count + logging.info(f'滚动加载完成: 初始 {initial_count} → 最终 {final_count} 个短剧 (总共新增: {total_new} 个)') + + except Exception as e: + logging.warning(f'滚动加载过程中出错: {e}') + + + def _collect_from_ssr(self): + """从SSR数据收集数据""" + logging.info('开始SSR数据收集') + + # 尝试直接从window对象获取 + keys = ['_SSR_HYDRATED_DATA', 'RENDER_DATA'] + for key in keys: + try: + data = self.driver.execute_script(f'return window.{key}') + if data: + text = json.dumps(data, ensure_ascii=False) + self._parse_and_add_item(text, f'page_{key}', None, 'ssr') + logging.info(f'从 {key} 中解析完成') + except Exception: + continue + + logging.info(f'SSR数据收集完成,发现 {self.source_stats["ssr"]} 个有效项') + + def _collect_from_page(self): + """从页面解析收集数据(兜底方案)""" + logging.info('开始页面数据收集(兜底方案)') + + try: + page_source = self.driver.page_source + self._parse_and_add_item(page_source, 'page_source', None, 'page') + + # 同时尝试识别statis结构中的play_vv + for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source): + try: + vv = int(m) + # 从页面源码中无法获取完整的合集信息,跳过这些不完整的数据 + logging.debug(f'从页面源码statis中发现播放量: {vv},但缺少完整信息,跳过') + except Exception: + pass + + except Exception: + pass + + logging.info(f'页面数据收集完成,发现 {self.source_stats["page"]} 个有效项') + + def _parse_and_add_item(self, text: str, source_url: str, request_id: str, source_type: str): + """解析文本数据并添加到统一存储""" + try: + # 尝试解析JSON数据 + if text.strip().startswith('{') or text.strip().startswith('['): + try: + data = json.loads(text) + self._extract_from_json_data(data, source_url, request_id, source_type) + return + except json.JSONDecodeError: + pass + + # 如果不是JSON,使用正则表达式查找 + self._extract_from_text_regex(text, source_url, request_id, source_type) + + except Exception as e: + logging.debug(f'解析 {source_type} 数据时出错: {e}') + + def _extract_from_json_data(self, data, source_url: str, request_id: str, source_type: str): + """从JSON数据中递归提取合集信息""" + def extract_mix_info(obj, path=""): + if isinstance(obj, dict): + # 检查是否包含有效的合集信息 + if self._is_valid_collection_data(obj): + item_data = self._build_item_data(obj, source_url, request_id, source_type) + if item_data: + self._add_item_with_validation(item_data, source_type) + + # 递归搜索子对象 + for key, value in obj.items(): + if isinstance(value, (dict, list)): + extract_mix_info(value, f"{path}.{key}" if path else key) + + elif isinstance(obj, list): + for i, item in enumerate(obj): + if isinstance(item, (dict, list)): + extract_mix_info(item, f"{path}[{i}]" if path else f"[{i}]") + + extract_mix_info(data) + + def _extract_from_text_regex(self, text: str, source_url: str, request_id: str, source_type: str): + """使用正则表达式从文本中提取信息""" + # 查找包含完整合集信息的JSON片段 + mix_pattern = r'\{[^{}]*"mix_id"\s*:\s*"([^"]*)"[^{}]*"mix_name"\s*:\s*"([^"]*)"[^{}]*"statis"\s*:\s*\{[^{}]*"play_vv"\s*:\s*(\d+)[^{}]*\}[^{}]*\}' + + for match in re.finditer(mix_pattern, text): + try: + mix_id = match.group(1) + mix_name = match.group(2) + vv = int(match.group(3)) + + # 构建基础数据 + item_data = { + 'mix_id': mix_id, + 'mix_name': mix_name, + 'play_vv': vv, + 'url': source_url, + 'request_id': request_id, + 'source_type': source_type, + 'timestamp': datetime.now().isoformat() + } + + # 验证并添加 + if self._validate_item(item_data): + self._add_item_with_validation(item_data, source_type) + + except Exception: + continue + + def _is_valid_collection_data(self, obj: dict) -> bool: + """检查是否为有效的收藏合集数据""" + # 必须有mix_id和statis字段 + if 'mix_id' not in obj or 'statis' not in obj: + return False + + # statis必须是字典且包含play_vv + statis = obj.get('statis', {}) + if not isinstance(statis, dict) or 'play_vv' not in statis: + return False + + # play_vv必须是有效数字 + play_vv = statis.get('play_vv') + if not isinstance(play_vv, (int, str)): + return False + + try: + vv = int(play_vv) + # 收藏合集的短剧播放量不可能为0 + if vv <= 0: + return False + except (ValueError, TypeError): + return False + + return True + + def _build_item_data(self, obj: dict, source_url: str, request_id: str, source_type: str) -> Optional[dict]: + """构建标准化的数据项""" + try: + mix_id = obj.get('mix_id', '') + mix_name = obj.get('mix_name', '') + + # 获取播放量(与_is_valid_collection_data方法保持一致) + play_vv = 0 + + # 方式1:从statis字段获取 + if 'statis' in obj and isinstance(obj['statis'], dict): + statis = obj['statis'] + if 'play_vv' in statis: + play_vv = statis['play_vv'] + + # 方式2:直接从对象中获取play_vv + if play_vv == 0 and 'play_vv' in obj: + play_vv = obj['play_vv'] + + # 方式3:从其他可能的字段获取 + if play_vv == 0: + for field in ['play_count', 'view_count', 'vv']: + if field in obj: + play_vv = obj[field] + break + + # 转换为整数 + if isinstance(play_vv, str) and play_vv.isdigit(): + play_vv = int(play_vv) + + # 数据验证 + if not mix_id or play_vv <= 0: + return None + + # 如果mix_name为空,使用mix_id作为名称 + if not mix_name or mix_name.strip() == "": + mix_name = f"短剧_{mix_id}" + logging.warning(f"⚠️ mix_name为空,使用mix_id作为名称: {mix_name}") + + # 构建合集链接 + video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" + + # 构建标准数据项 + item_data = { + 'mix_id': mix_id, + 'mix_name': mix_name, + 'play_vv': play_vv, + 'formatted': self._format_count(play_vv), + 'url': source_url, + 'request_id': request_id, + 'video_url': video_url, + 'source_type': source_type, + 'timestamp': datetime.now().isoformat() + } + + # 提取额外字段 + self._extract_additional_fields(obj, item_data) + + return item_data + + except Exception as e: + logging.debug(f'构建数据项失败: {e}') + return None + + def _extract_additional_fields(self, obj: dict, item_data: dict): + """提取额外的字段信息""" + # 提取合集封面图片URL + cover_image_url = "" + cover_image_backup_urls = [] + + # 查找封面图片字段 + for field in ['cover', 'cover_url', 'image', 'pic']: + if field in obj: + field_data = obj[field] + if isinstance(field_data, dict) and 'url_list' in field_data and field_data['url_list']: + cover_image_url = field_data['url_list'][0] + cover_image_backup_urls = field_data['url_list'][1:] if len(field_data['url_list']) > 1 else [] + break + elif isinstance(field_data, str): + cover_image_url = field_data + break + + item_data['cover_image_url'] = cover_image_url + item_data['cover_backup_urls'] = cover_image_backup_urls + + # 提取合集作者/影视工作室 + series_author = "" + for author_field in ['author', 'creator', 'user']: + if author_field in obj: + author_data = obj[author_field] + if isinstance(author_data, dict): + series_author = (author_data.get('nickname') or + author_data.get('unique_id') or + author_data.get('short_id') or + author_data.get('name') or '') + break + elif isinstance(author_data, str): + series_author = author_data + break + + item_data['series_author'] = series_author + + # 提取合集描述 + desc = "" + if 'desc' in obj and obj['desc']: + desc_value = str(obj['desc']).strip() + if desc_value: + desc = desc_value + + item_data['desc'] = desc + + # 提取合集总集数 + updated_to_episode = 0 + if 'statis' in obj and isinstance(obj['statis'], dict): + statis = obj['statis'] + if 'updated_to_episode' in statis: + try: + episodes = int(statis['updated_to_episode']) + if episodes > 0: + updated_to_episode = episodes + except ValueError: + pass + + item_data['updated_to_episode'] = updated_to_episode + + def _validate_item(self, item_data: dict) -> bool: + """验证数据项的有效性""" + # 基本字段验证 + mix_id = item_data.get('mix_id', '') + mix_name = item_data.get('mix_name', '') + play_vv = item_data.get('play_vv', 0) + + # 必须有mix_id和mix_name + if not mix_id or not mix_name: + return False + + # 播放量必须大于0(收藏合集的短剧不可能为0) + if play_vv <= 0: + return False + + # 排除占位名称 + if mix_name.startswith('短剧_') or '未知' in mix_name: + return False + + return True + + def _add_item_with_validation(self, item_data: dict, source_type: str): + """验证并添加数据项,包含实时去重""" + if not self._validate_item(item_data): + self.source_stats['filtered'] += 1 + return + + mix_id = item_data.get('mix_id') + + # 实时去重:保留播放量最大的版本 + if mix_id in self.collected_items: + existing = self.collected_items[mix_id] + current_play_vv = item_data.get('play_vv', 0) + existing_play_vv = existing.get('play_vv', 0) + + if current_play_vv > existing_play_vv: + # 当前数据更好,替换 + self.collected_items[mix_id] = item_data + logging.info(f'🔄 更新重复短剧: {item_data.get("mix_name")} (播放量: {existing_play_vv:,} → {current_play_vv:,})') + else: + # 已有数据更好,跳过 + logging.info(f'⏭️ 跳过重复短剧: {item_data.get("mix_name")} (当前: {current_play_vv:,}, 已有: {existing_play_vv:,})') + + # 记录去重统计 + logging.debug(f'去重统计: mix_id={mix_id}, 已有播放量={existing_play_vv:,}, 新播放量={current_play_vv:,}, 是否更新={current_play_vv > existing_play_vv}') + else: + # 新数据,直接添加 + self.collected_items[mix_id] = item_data + self.source_stats[source_type] += 1 + logging.info(f'✅ 添加新短剧: {item_data.get("mix_name")} - {item_data.get("play_vv", 0):,} 播放量') + + def _format_count(self, n: int) -> str: + """格式化数字显示""" + if n >= 100_000_000: + return f"{n/100_000_000:.1f}亿" + if n >= 10_000: + return f"{n/10_000:.1f}万" + return str(n) + + def _log_collection_stats(self): + """输出收集统计信息""" + logging.info('=' * 60) + logging.info('统一数据收集统计:') + logging.info(f' - 网络API: {self.source_stats["network"]} 个') + logging.info(f' - SSR数据: {self.source_stats["ssr"]} 个') + logging.info(f' - 页面解析: {self.source_stats["page"]} 个') + logging.info(f' - 过滤无效: {self.source_stats["filtered"]} 个') + logging.info(f' - 最终结果: {len(self.collected_items)} 个唯一短剧') + logging.info('=' * 60) + + class DouyinPlayVVScraper: def __init__(self, start_url: str = None, auto_continue: bool = False, duration_s: int = 60): self.start_url = start_url or "https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation" @@ -681,37 +1310,127 @@ class DouyinPlayVVScraper: return True # 改为假设已登录,避免卡住 def trigger_loading(self): - logging.info('触发数据加载:滚动 + 刷新') + logging.info('触发数据加载:强力滚动直到"暂时没有更多了"') - # 在auto_continue模式下增加页面加载等待时间 - if self.auto_continue: - logging.info('自动继续模式:增加页面加载等待时间') - time.sleep(10) # 增加到10秒,确保页面完全加载 + # 等待页面完全加载 + logging.info('等待页面完全加载...') + time.sleep(10) + + # 强力滚动策略 - 模拟真实用户行为,直到看到"暂时没有更多了" + max_scroll_attempts = 50 # 最大滚动尝试次数 + scroll_count = 0 + no_more_content_found = False + + while scroll_count < max_scroll_attempts and not no_more_content_found: + try: + scroll_count += 1 + logging.info(f'第{scroll_count}次强力滚动...') + + # 强力滚动:多次大幅度滚动确保触发懒加载 + scroll_distance = 800 + (scroll_count * 200) + + # 执行强力滚动JavaScript + self.driver.execute_script(f""" + // 1. 强制滚动页面 + window.scrollBy(0, {scroll_distance}); + document.documentElement.scrollTop += {scroll_distance}; + document.body.scrollTop += {scroll_distance}; + + // 2. 滚动到页面底部(触发懒加载) + window.scrollTo(0, document.body.scrollHeight); + + // 3. 查找并滚动所有可能的容器 + const containers = document.querySelectorAll('[data-e2e="comment-list"], .comment-list, [class*="comment"], [class*="scroll"], [role="main"], [class*="collection"], [class*="favorite"]'); + containers.forEach(container => {{ + if (container.scrollTop !== undefined) {{ + container.scrollTop = container.scrollHeight; + container.dispatchEvent(new Event('scroll', {{ bubbles: true }})); + }} + }}); + + // 4. 触发所有相关事件 + ['scroll', 'wheel', 'touchmove', 'resize'].forEach(eventType => {{ + window.dispatchEvent(new Event(eventType, {{ bubbles: true }})); + document.dispatchEvent(new Event(eventType, {{ bubbles: true }})); + }}); + + // 5. 模拟用户交互 + document.body.click(); + + console.log('执行强力滚动:', {scroll_distance}, 'px'); + """) + + # 等待数据加载 + time.sleep(3) + + # 检查是否有新数据加载 + current_height = self.driver.execute_script("return document.body.scrollHeight;") + logging.info(f'当前页面高度: {current_height}px') + + # 检查是否到达底部 - 看到"暂时没有更多了" + no_more_content_found = self._check_no_more_content() + if no_more_content_found: + logging.info('✅ 检测到页面底部:"暂时没有更多了",停止滚动') + break + + # 检查页面高度是否不再增加(说明没有新内容加载) + if scroll_count > 5: + previous_height = current_height + time.sleep(2) + new_height = self.driver.execute_script("return document.body.scrollHeight;") + if new_height == previous_height: + logging.info('页面高度不再增加,可能已加载全部内容') + break + + except Exception as e: + logging.error(f'滚动过程中出错: {e}') + time.sleep(2) + + if no_more_content_found: + logging.info('🎉 成功滚动到页面底部,所有内容已加载完成') else: - # 普通模式也需要增加页面加载等待时间 - logging.info('普通模式:增加页面加载等待时间') - time.sleep(10) # 增加到10秒,确保页面完全加载 + logging.info(f'达到最大滚动次数 {max_scroll_attempts},停止滚动') - # 第一轮滚动:触发懒加载 - logging.info('第一轮滚动:触发懒加载') - for i in range(10): # 增加滚动次数 - self.driver.execute_script(f'window.scrollTo(0, {i * 900});') - time.sleep(1.5) # 增加等待时间 - - # 等待数据加载 - logging.info('等待数据加载...') - time.sleep(5) - - # 刷新触发新请求 - logging.info('刷新页面触发新请求') - self.driver.refresh() - time.sleep(6) # 增加刷新后的等待时间 - - # 第二轮滚动:确保所有数据加载 - logging.info('第二轮滚动:确保所有数据加载') - for i in range(8): - self.driver.execute_script(f'window.scrollTo(0, {i * 1200});') - time.sleep(1.5) + # 最终检查一次是否还有更多内容 + final_check = self._check_no_more_content() + if not final_check: + logging.info('⚠️ 最终检查:可能还有更多内容未加载') + + def _check_no_more_content(self) -> bool: + """检查是否已到达页面底部,没有更多内容""" + try: + # 检查多种可能的底部标识文本 + bottom_indicators = [ + "暂时没有更多了", + "没有更多内容", + "已加载全部", + "加载完毕", + "no more content", + "end of content" + ] + + for indicator in bottom_indicators: + try: + result = self.driver.execute_script(f""" + var elements = document.querySelectorAll('*'); + for (var i = 0; i < elements.length; i++) {{ + var text = elements[i].textContent || elements[i].innerText; + if (text.includes('{indicator}')) {{ + return true; + }} + }} + return false; + """) + if result: + logging.info(f'✅ 检测到页面底部标识: "{indicator}"') + return True + except Exception: + continue + + return False + except Exception as e: + logging.debug(f'检查页面底部失败: {e}') + return False def format_count(self, n: int) -> str: if n >= 100_000_000: @@ -774,463 +1493,9 @@ class DouyinPlayVVScraper: except Exception as e: logging.error(f'保存评论失败: {e}') - return None - - def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None): - """解析文本中的play_vv、mix_name和watched_item信息""" - try: - # 尝试解析JSON数据 - if text.strip().startswith('{') or text.strip().startswith('['): - try: - data = json.loads(text) - self._extract_from_json_data(data, source_url, request_id) - return - except json.JSONDecodeError: - pass - - # 如果不是JSON,使用正则表达式查找 - self._extract_from_text_regex(text, source_url, request_id) - - except Exception as e: - logging.warning(f'解析文本数据时出错: {e}') + return None - def _extract_from_json_data(self, data, source_url: str, request_id: str = None): - """从JSON数据中递归提取合集信息""" - def extract_mix_info(obj, path=""): - if isinstance(obj, dict): - # 检查是否包含合集信息 - if 'mix_id' in obj and 'statis' in obj: - mix_id = obj.get('mix_id', '') - mix_name = obj.get('mix_name', '') - statis = obj.get('statis', {}) - - if isinstance(statis, dict) and 'play_vv' in statis: - play_vv = statis.get('play_vv') - if isinstance(play_vv, (int, str)) and str(play_vv).isdigit(): - vv = int(play_vv) - - # 数据验证:确保有mix_id(按短剧ID去重,所以必须有mix_id) - if not mix_id or mix_id.strip() == "": - logging.warning(f"跳过缺少mix_id的数据: play_vv={vv}, mix_name={mix_name}") - # 跳过当前项,但继续递归解析其他数据(不使用return) - else: - # 如果mix_name为空,使用mix_id作为名称 - if not mix_name or mix_name.strip() == "": - mix_name = f"短剧_{mix_id}" - logging.warning(f"⚠️ mix_name为空,使用mix_id作为名称: {mix_name}") - # 🔧 修复:不跳过播放量为0的数据,而是标记并保留 - # 这些数据可能是因为页面加载不完整,但合集本身是存在的 - # 警告信息移到去重检查之后,只有真正添加时才警告 - - # 构建合集链接 - video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" - - # 提取合集封面图片URL - 直接存储完整的图片链接 - cover_image_url = "" - cover_image_backup_urls = [] # 备用链接列表 - # 查找封面图片字段,优先获取完整的URL链接 - if 'cover' in obj: - cover = obj['cover'] - if isinstance(cover, dict) and 'url_list' in cover and cover['url_list']: - # 主链接 - cover_image_url = cover['url_list'][0] - # 备用链接 - cover_image_backup_urls = cover['url_list'][1:] if len(cover['url_list']) > 1 else [] - elif isinstance(cover, str): - cover_image_url = cover - elif 'cover_url' in obj: - cover_url = obj['cover_url'] - if isinstance(cover_url, dict) and 'url_list' in cover_url and cover_url['url_list']: - cover_image_url = cover_url['url_list'][0] - cover_image_backup_urls = cover_url['url_list'][1:] if len(cover_url['url_list']) > 1 else [] - elif isinstance(cover_url, str): - cover_image_url = cover_url - elif 'image' in obj: - image = obj['image'] - if isinstance(image, dict) and 'url_list' in image and image['url_list']: - cover_image_url = image['url_list'][0] - cover_image_backup_urls = image['url_list'][1:] if len(image['url_list']) > 1 else [] - elif isinstance(image, str): - cover_image_url = image - elif 'pic' in obj: - pic = obj['pic'] - if isinstance(pic, dict) and 'url_list' in pic and pic['url_list']: - cover_image_url = pic['url_list'][0] - cover_image_backup_urls = pic['url_list'][1:] if len(pic['url_list']) > 1 else [] - elif isinstance(pic, str): - cover_image_url = pic - - # 提取新增的五个字段 - series_author = "" - desc = "" - updated_to_episode = 0 - manufacturing_field = "" # 承制信息 - copyright_field = "" # 版权信息 - - # 提取合集作者/影视工作室 - if 'author' in obj: - author = obj['author'] - if isinstance(author, dict): - # 尝试多个可能的作者字段 - series_author = (author.get('nickname') or - author.get('unique_id') or - author.get('short_id') or - author.get('name') or '') - elif isinstance(author, str): - series_author = author - elif 'creator' in obj: - creator = obj['creator'] - if isinstance(creator, dict): - series_author = (creator.get('nickname') or - creator.get('unique_id') or - creator.get('name') or '') - elif isinstance(creator, str): - series_author = creator - elif 'user' in obj: - user = obj['user'] - if isinstance(user, dict): - series_author = (user.get('nickname') or - user.get('unique_id') or - user.get('name') or '') - elif isinstance(user, str): - series_author = user - - # 提取合集描述 - 扩展更多可能的字段 - description_fields = ['desc', 'share_info'] # 保持字段列表 - - # 先检查desc字段 - if 'desc' in obj and obj['desc']: - desc_value = str(obj['desc']).strip() - if desc_value: - desc = desc_value - logging.info(f"从desc提取到描述") - - # 如果desc中没有找到有效描述,检查share_info - if not desc and 'share_info' in obj and isinstance(obj['share_info'], dict): - share_desc = obj['share_info'].get('share_desc', '').strip() - if share_desc: - desc = share_desc - logging.info(f"从share_info.share_desc提取到描述") - - # 如果share_info中没有找到有效描述,继续检查desc字段 - if not desc: - for field in description_fields: - if field in obj and obj[field]: - desc_value = str(obj[field]).strip() - if desc_value: - desc = desc_value - logging.info(f"从{field}提取到描述") - break - - # 如果还没有找到描述,尝试从嵌套对象中查找desc字段 - if not desc: - def search_nested_desc(data, depth=0): - if depth > 3: # 限制递归深度 - return None - - if isinstance(data, dict): - # 检查当前层级的desc字段 - if 'desc' in data and data['desc']: - desc_value = str(data['desc']).strip() - if 5 <= len(desc_value) <= 1000: - return desc_value - - # 递归检查嵌套对象 - for value in data.values(): - if isinstance(value, dict): - nested_result = search_nested_desc(value, depth + 1) - if nested_result: - return nested_result - return None - - desc = search_nested_desc(obj) - - - # 提取合集总集数 - 从statis字段中获取 - updated_to_episode = 0 # 初始化默认值 - if 'statis' in obj and isinstance(obj['statis'], dict): - statis = obj['statis'] - if 'updated_to_episode' in statis: - try: - episodes = int(statis['updated_to_episode']) - if episodes > 0: - updated_to_episode = episodes - logging.info(f"从statis.updated_to_episode提取到集数: {episodes}") - except ValueError: - logging.warning("updated_to_episode字段值无法转换为整数") - else: - logging.info("未找到statis字段或statis不是字典类型") - try: - episodes = int(obj['updated_to_episode']) - if episodes > 0: - updated_to_episode = episodes - logging.info(f"从updated_to_episode提取到集数: {episodes}") - except ValueError: - pass # 忽略无法转换为整数的情况 - - # 构建合集数据 - item_data = { - 'play_vv': vv, - 'formatted': self.format_count(vv), - 'url': source_url, - 'request_id': request_id, - 'mix_name': mix_name, - 'video_url': video_url, # 合集链接 - 'mix_id': mix_id, # 合集ID - 'cover_image_url': cover_image_url, # 合集封面图片主链接(完整URL) - 'cover_backup_urls': cover_image_backup_urls, # 封面图片备用链接列表 - 'series_author': series_author, # 合集作者/影视工作室 - 'desc': desc, # 合集描述 - 'updated_to_episode': updated_to_episode, # 合集总集数 - 'timestamp': datetime.now().isoformat() - } - - # 🔧 修复:添加前检查是否已存在(避免重复) - # 检查是否已经有相同mix_id的数据 - existing_item = None - for existing in self.play_vv_items: - if existing.get('mix_id') == mix_id: - existing_item = existing - break - - if existing_item: - # 如果已存在,比较播放量,保留更大的 - existing_vv = existing_item.get('play_vv', 0) - if vv > existing_vv: - # 当前数据更好,替换 - logging.info(f'🔄 更新重复短剧: {mix_name} (播放量: {existing_vv:,} → {vv:,})') - self.play_vv_items.remove(existing_item) - self.play_vv_items.append(item_data) - else: - # 已有数据更好,跳过当前数据但继续递归解析其他数据 - logging.info(f'⏭️ 跳过重复短剧: {mix_name} (当前: {vv:,}, 已有: {existing_vv:,})') - # 注意:不使用return,避免中断递归解析 - else: - # 不存在,直接添加 - self.play_vv_items.append(item_data) - - # 只有在真正添加时,才对播放量为0的数据发出警告 - if vv <= 0: - logging.warning(f"⚠️ 添加了播放量为0的数据: {mix_name} (ID: {mix_id})") - logging.warning(f" 这可能需要后续重新获取播放量") - - # 🔧 修复:不在数据收集阶段进行实时保存 - # 实时保存会触发获取详细内容,导致数据收集中断 - # 改为在数据收集完成后统一处理 - # if self.realtime_save_enabled: - # self.save_single_item_realtime(item_data) - - logging.info(f'提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量') - if series_author: - logging.info(f' 作者: {series_author}') - if desc: - logging.info(f' 描述: {desc[:100]}{"..." if len(desc) > 100 else ""}') - if updated_to_episode > 0: - logging.info(f' 总集数: {updated_to_episode}') - - # 递归搜索子对象 - for key, value in obj.items(): - if isinstance(value, (dict, list)): - extract_mix_info(value, f"{path}.{key}" if path else key) - - elif isinstance(obj, list): - for i, item in enumerate(obj): - if isinstance(item, (dict, list)): - extract_mix_info(item, f"{path}[{i}]" if path else f"[{i}]") - - extract_mix_info(data) - - def _extract_from_text_regex(self, text: str, source_url: str, request_id: str = None): - """使用正则表达式从文本中提取信息""" - # 查找包含完整合集信息的JSON片段,包括statis中的updated_to_episode - mix_pattern = r'\{[^{}]*"mix_id"\s*:\s*"([^"]*)"[^{}]*"mix_name"\s*:\s*"([^"]*)"[^{}]*"statis"\s*:\s*\{[^{}]*"play_vv"\s*:\s*(\d+)[^{}]*"updated_to_episode"\s*:\s*(\d+)[^{}]*\}[^{}]*\}' - - for match in re.finditer(mix_pattern, text): - try: - mix_id = match.group(1) - mix_name = match.group(2) - vv = int(match.group(3)) - episodes = int(match.group(4)) - - # 数据验证:确保有mix_id(按短剧ID去重) - # 注意:播放量为0的数据也会被保存,可能是新发布的短剧 - if vv <= 0: - logging.warning(f"⚠️ 发现播放量为0的数据: mix_name={mix_name}, play_vv={vv},仍会保存") - - # 检查mix_id,如果没有则跳过 - if not mix_id or mix_id.strip() == "": - logging.warning(f"正则提取跳过缺少mix_id的数据: play_vv={vv}, mix_name={mix_name}") - continue - - # 如果mix_name为空,使用mix_id作为名称 - if not mix_name or mix_name.strip() == "": - mix_name = f"短剧_{mix_id}" - logging.warning(f"⚠️ mix_name为空,使用mix_id作为名称: {mix_name}") - - # 构建合集链接 - video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" - - if episodes > 0: - logging.info(f"从statis.updated_to_episode提取到集数: {episodes}") - - # 构建合集数据 - item_data = { - 'play_vv': vv, - 'formatted': self.format_count(vv), - 'url': source_url, - 'request_id': request_id, - 'mix_name': mix_name, - 'video_url': video_url, # 合集链接 - 'mix_id': mix_id, # 合集ID - 'updated_to_episode': episodes if episodes > 0 else None, # 从statis.updated_to_episode提取的集数 - 'timestamp': datetime.now().isoformat() - } - - # 添加到列表(保持原有逻辑) - self.play_vv_items.append(item_data) - - logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量') - except Exception: - continue - - # 兜底:查找单独的play_vv值 - for match in re.findall(r'"play_vv"\s*:\s*(\d+)', text): - try: - vv = int(match) - # 数据验证:播放量为0的数据也会被保存 - if vv <= 0: - logging.warning(f"⚠️ 发现播放量为0的数据: play_vv={vv},仍会保存") - - # 检查是否已经存在相同的play_vv - if not any(item['play_vv'] == vv for item in self.play_vv_items): - # 由于无法获取完整的合集信息,跳过这些不完整的数据 - # 避免产生mix_name为空的无效记录 - logging.warning(f"跳过不完整的数据记录: play_vv={vv}, 缺少合集名称") - continue - except Exception: - continue - - def collect_network_bodies(self, duration_s: int = None): - if duration_s is None: - duration_s = self.duration_s - logging.info(f'开始收集网络响应体,持续 {duration_s}s') - start = time.time() - known_request_ids = set() - - # 目标关键词(收藏/合集/视频) - url_keywords = ['aweme', 'mix', 'collection', 'favorite', 'note', 'api'] - - last_progress = 0 - while time.time() - start < duration_s: - try: - logs = self.driver.get_log('performance') - except Exception as e: - import traceback - error_details = { - 'error_type': type(e).__name__, - 'error_message': str(e), - 'traceback': traceback.format_exc(), - 'context': '获取Chrome性能日志' - } - logging.warning(f'获取性能日志失败: {error_details["error_type"]} - {error_details["error_message"]}') - logging.warning(f'详细错误信息: {error_details["traceback"]}') - logging.warning(f'错误上下文: {error_details["context"]}') - time.sleep(1) - continue - - for entry in logs: - try: - message = json.loads(entry['message'])['message'] - except Exception: - continue - - method = message.get('method') - params = message.get('params', {}) - - # 记录请求URL - if method == 'Network.requestWillBeSent': - req_id = params.get('requestId') - url = params.get('request', {}).get('url', '') - if any(k in url for k in url_keywords): - self.captured_responses.append({'requestId': req_id, 'url': url, 'type': 'request'}) - - # 响应到达,尝试获取响应体 - if method == 'Network.responseReceived': - req_id = params.get('requestId') - url = params.get('response', {}).get('url', '') - type_ = params.get('type') # XHR, Fetch, Document - if req_id and req_id not in known_request_ids: - known_request_ids.add(req_id) - # 仅处理XHR/Fetch - if type_ in ('XHR', 'Fetch') and any(k in url for k in url_keywords): - try: - body_obj = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': req_id}) - body_text = body_obj.get('body', '') - # 可能是base64编码 - if body_obj.get('base64Encoded'): - try: - body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore') - except Exception: - pass - - # 解析play_vv - self.parse_play_vv_from_text(body_text, url, req_id) - except Exception: - # 某些响应不可获取或过大 - pass - elapsed = int(time.time() - start) - if elapsed - last_progress >= 5: - last_progress = elapsed - logging.info(f'进度: {elapsed}/{duration_s}, 目标数量: {len(self.play_vv_items)}') - time.sleep(0.8) - - logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)} 个目标') - logging.info(f'=' * 60) - logging.info(f'网络收集阶段统计:') - logging.info(f' - 总数量: {len(self.play_vv_items)} 个合集') - logging.info(f' - 播放量为0: {sum(1 for item in self.play_vv_items if item.get("play_vv", 0) == 0)} 个') - logging.info(f' - 播放量正常: {sum(1 for item in self.play_vv_items if item.get("play_vv", 0) > 0)} 个') - logging.info(f'=' * 60) - logging.info(f'开始解析SSR数据...') - - - def parse_ssr_data(self): - logging.info('尝试解析页面SSR数据') - # 尝试直接从window对象获取 - keys = ['_SSR_HYDRATED_DATA', 'RENDER_DATA'] - for key in keys: - try: - data = self.driver.execute_script(f'return window.{key}') - if data: - text = json.dumps(data, ensure_ascii=False) - self.parse_play_vv_from_text(text, f'page_{key}', None) - logging.info(f'从 {key} 中解析完成') - except Exception: - continue - - # 兜底:从page_source中正则查找 - try: - page_source = self.driver.page_source - self.parse_play_vv_from_text(page_source, 'page_source', None) - # 同时尝试识别statis结构中的play_vv - for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source): - try: - vv = int(m) - # 数据验证:播放量为0的数据也会被保存 - if vv <= 0: - logging.warning(f"⚠️ 发现播放量为0的数据: play_vv={vv},仍会保存") - - # 检查是否已经存在相同的play_vv - if not any(item['play_vv'] == vv for item in self.play_vv_items): - # 由于从statis中无法获取完整的合集信息,跳过这些不完整的数据 - # 避免产生mix_name为空的无效记录 - logging.warning(f"跳过不完整的数据记录: play_vv={vv}, 来源statis但缺少合集名称") - continue - except Exception: - pass - except Exception: - pass def dedupe(self): # 🔧 修复:按mix_id去重,保留播放量最大的那个 @@ -2291,18 +2556,16 @@ class DouyinPlayVVScraper: # 等待页面加载完成 try: - - WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "video")) ) except Exception as e: logging.warning(f'等待视频元素超时: {e}') - + # 获取网络请求日志 logs = self.driver.get_log('performance') video_info = {} - + for entry in logs: try: log = json.loads(entry['message'])['message'] @@ -2334,7 +2597,7 @@ class DouyinPlayVVScraper: break except Exception as e: logging.warning(f'解析日志条目时出错: {e}') - + return video_info def get_collection_videos(self, mix_id: str, mix_name: str = '', current_episode_count: int = 0) -> list: @@ -3950,34 +4213,51 @@ class DouyinPlayVVScraper: try: # 在开始抓取前清理旧数据(保留最近7天) self.cleanup_old_management_data(days_to_keep=7) - + self.setup_driver() self.navigate() self.ensure_login() self.trigger_loading() - + logging.info('=' * 60) - logging.info('开始数据收集阶段') + logging.info('开始统一数据收集') logging.info('=' * 60) - self.collect_network_bodies() - logging.info(f'✅ 网络数据收集完成:{len(self.play_vv_items)} 个合集') - - self.parse_ssr_data() - logging.info(f'✅ SSR数据解析完成:{len(self.play_vv_items)} 个合集') - + + # 使用统一数据收集器 + collector = UnifiedDataCollector(self.driver, self.duration_s) + collected_data = collector.collect_all_data() + + # 将收集到的数据转换为原有格式 + self.play_vv_items = [] + for item in collected_data: + self.play_vv_items.append({ + 'play_vv': item.get('play_vv', 0), + 'formatted': item.get('formatted', ''), + 'url': item.get('url', ''), + 'request_id': item.get('request_id', ''), + 'mix_name': item.get('mix_name', ''), + 'video_url': item.get('video_url', ''), + 'mix_id': item.get('mix_id', ''), + 'cover_image_url': item.get('cover_image_url', ''), + 'cover_backup_urls': item.get('cover_backup_urls', []), + 'series_author': item.get('series_author', ''), + 'desc': item.get('desc', ''), + 'updated_to_episode': item.get('updated_to_episode', 0), + 'timestamp': item.get('timestamp', '') + }) + + logging.info(f'✅ 统一数据收集完成:{len(self.play_vv_items)} 个合集') + + # 统一数据收集器已实时去重,无需额外去重步骤 logging.info('=' * 60) - logging.info('开始数据去重') + logging.info('数据去重已完成(统一收集器实时处理)') logging.info('=' * 60) - before_dedupe = len(self.play_vv_items) - self.dedupe() - after_dedupe = len(self.play_vv_items) - logging.info(f'✅ 去重完成:{before_dedupe} → {after_dedupe} (移除 {before_dedupe - after_dedupe} 个重复项)') - + logging.info('=' * 60) logging.info('开始保存数据') logging.info('=' * 60) self.save_results() - + logging.info('=' * 60) logging.info(f'✅ 全部完成!共处理 {len(self.play_vv_items)} 个合集') logging.info('=' * 60)