#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Selenium + Chrome DevTools Protocol 抓取抖音收藏合集真实播放量(play_vv) 核心能力: - 启用CDP网络事件,获取响应体并解析play_vv - 复用本地Chrome用户数据,绕过登录障碍 - 自动滚动与刷新触发更多API请求 - 同时解析页面中的SSR数据(window._SSR_HYDRATED_DATA/RENDER_DATA) 使用方法: 1) 默认复用 `config/chrome_profile` 下的已登录Chrome配置。 2) 若仍需登录,请在弹出的Chrome中完成登录后回到终端按回车。 3) 程序会滚动和刷新,自动收集网络数据并提取play_vv。 """ import json import re import subprocess import time import logging import os import shutil from datetime import datetime from selenium import webdriver import os from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options # 保留导入但默认不使用webdriver_manager,避免网络下载卡顿 from webdriver_manager.chrome import ChromeDriverManager # noqa: F401 import chromedriver_autoinstaller from pymongo import MongoClient from pymongo.errors import ConnectionFailure # 配置日志 # 确保logs目录存在 import os script_dir = os.path.dirname(os.path.abspath(__file__)) logs_dir = os.path.join(script_dir, 'logs') os.makedirs(logs_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='[%(levelname)s] %(message)s', handlers=[ logging.FileHandler(os.path.join(logs_dir, 'douyin_scraper.log'), encoding='utf-8'), logging.StreamHandler() ] ) class DouyinPlayVVScraper: def __init__(self, start_url: str = None, auto_continue: bool = False, duration_s: int = 60): self.start_url = start_url or "https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation" self.auto_continue = auto_continue self.duration_s = duration_s self.driver = None self.play_vv_items = [] # list of dicts: {play_vv, formatted, url, request_id, mix_name, watched_item} self.captured_responses = [] self.collected_aweme_ids = [] # 收集到的视频ID列表 self.mix_aweme_mapping = {} # 合集ID到视频ID列表的映射 self.mongo_client = None self.db = None self.collection = None self._cleanup_old_profiles() self._setup_mongodb() def _setup_mongodb(self): """设置MongoDB连接""" try: # MongoDB连接配置 mongo_host = os.environ.get('MONGO_HOST', 'localhost') mongo_port = int(os.environ.get('MONGO_PORT', 27017)) mongo_db = os.environ.get('MONGO_DB', 'Rankings') mongo_collection = os.environ.get('MONGO_COLLECTION', 'Rankings_list') # 创建MongoDB连接 self.mongo_client = MongoClient(mongo_host, mongo_port, serverSelectionTimeoutMS=5000) # 测试连接 self.mongo_client.admin.command('ping') # 设置数据库和集合 self.db = self.mongo_client[mongo_db] self.collection = self.db[mongo_collection] logging.info(f'MongoDB连接成功: {mongo_host}:{mongo_port}/{mongo_db}.{mongo_collection}') except ConnectionFailure as e: logging.warning(f'MongoDB连接失败: {e}') logging.info('将仅保存到本地文件') self.mongo_client = None self.db = None self.collection = None except Exception as e: logging.warning(f'MongoDB设置出错: {e}') self.mongo_client = None self.db = None self.collection = None def _cleanup_old_profiles(self): """清理超过一天的旧临时Chrome配置文件""" try: script_dir = os.path.dirname(os.path.abspath(__file__)) profile_base_dir = os.path.join(script_dir, 'config', 'chrome_profile') if not os.path.exists(profile_base_dir): return current_time = time.time() one_day_ago = current_time - 24 * 60 * 60 # 24小时前 for item in os.listdir(profile_base_dir): if item.startswith('run_'): item_path = os.path.join(profile_base_dir, item) if os.path.isdir(item_path): try: # 提取时间戳 timestamp = int(item.split('_')[1]) if timestamp < one_day_ago: shutil.rmtree(item_path, ignore_errors=True) logging.info(f'清理旧配置文件: {item}') except (ValueError, IndexError): # 如果无法解析时间戳,跳过 continue except Exception as e: logging.warning(f'清理旧配置文件时出错: {e}') def _cleanup_chrome_processes(self): """清理可能占用配置文件的Chrome进程""" try: import subprocess import psutil # 获取当前配置文件路径 script_dir = os.path.dirname(os.path.abspath(__file__)) profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent') # 查找使用该配置文件的Chrome进程 killed_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: if proc.info['name'] and 'chrome' in proc.info['name'].lower(): cmdline = proc.info['cmdline'] if cmdline and any(profile_dir in arg for arg in cmdline): proc.terminate() killed_processes.append(proc.info['pid']) logging.info(f'终止占用配置文件的Chrome进程: PID {proc.info["pid"]}') except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue # 等待进程终止 if killed_processes: time.sleep(2) return len(killed_processes) > 0 except ImportError: # 如果没有psutil,使用系统命令 try: result = subprocess.run(['taskkill', '/f', '/im', 'chrome.exe'], capture_output=True, text=True, timeout=10) if result.returncode == 0: logging.info('使用taskkill清理Chrome进程') time.sleep(2) return True except Exception as e: logging.warning(f'清理Chrome进程失败: {e}') return False except Exception as e: logging.warning(f'清理Chrome进程时出错: {e}') return False def setup_driver(self): logging.info('初始化Chrome WebDriver (启用CDP网络日志)') # 清理可能占用配置文件的Chrome进程 self._cleanup_chrome_processes() chrome_options = Options() chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-blink-features=AutomationControlled') chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) chrome_options.add_experimental_option('useAutomationExtension', False) chrome_options.add_argument('--disable-extensions') chrome_options.add_argument('--remote-allow-origins=*') chrome_options.add_argument('--remote-debugging-port=0') chrome_options.add_argument('--start-maximized') chrome_options.add_argument('--lang=zh-CN') # 使用固定的Chrome配置文件目录以保持登录状态 script_dir = os.path.dirname(os.path.abspath(__file__)) profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent') os.makedirs(profile_dir, exist_ok=True) chrome_options.add_argument(f'--user-data-dir={profile_dir}') logging.info(f'使用持久化Chrome配置文件: {profile_dir}') # 明确设置Chrome二进制路径(32位Chrome常见安装位置) possible_chrome_bins = [ r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe", r"C:\Program Files\Google\Chrome\Application\chrome.exe" ] for bin_path in possible_chrome_bins: if os.path.exists(bin_path): chrome_options.binary_location = bin_path logging.info(f'使用Chrome二进制路径: {bin_path}') break # 性能日志(Network事件) chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'}) # 仅使用本地或PATH中的chromedriver,避免网络下载依赖 driver_ready = False candidates = [] # 可通过环境变量强制覆盖驱动路径 env_override = os.environ.get('OVERRIDE_CHROMEDRIVER') if env_override: candidates.append(env_override) logging.info(f'检测到环境变量 OVERRIDE_CHROMEDRIVER,优先使用: {env_override}') # 优先使用用户提供的路径 user_driver_path = os.path.join(os.getcwd(), 'drivers', 'chromedriver.exe') candidates.append(user_driver_path) logging.info(f'优先尝试用户提供路径: {user_driver_path}') # 项目根目录 candidates.append(os.path.join(os.getcwd(), 'chromedriver.exe')) # 其他可能目录 candidates.append(os.path.join(os.getcwd(), 'drivers', 'chromedriver')) # PATH 中的chromedriver which_path = shutil.which('chromedriver') if which_path: candidates.append(which_path) if not driver_ready: for p in candidates: try: if p and os.path.exists(p): logging.info(f'尝试使用chromedriver: {p}') service = Service(p) self.driver = webdriver.Chrome(service=service, options=chrome_options) driver_ready = True logging.info(f'使用chromedriver启动成功: {p}') try: caps = self.driver.capabilities browser_ver = caps.get('browserVersion') or caps.get('version') cdver = caps.get('chrome', {}).get('chromedriverVersion') logging.info(f'Chrome版本: {browser_ver}, ChromeDriver版本: {cdver}') except Exception: pass break else: logging.info(f'候选路径不存在: {p}') except Exception as e: logging.warning(f'尝试使用 {p} 启动失败: {e}') if not driver_ready: # 最终回退:使用webdriver-manager(可能需要网络) try: service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=chrome_options) driver_ready = True logging.info('使用webdriver-manager成功启动ChromeDriver') except Exception as e: raise RuntimeError('未能启动ChromeDriver。请手动下载匹配版本的chromedriver到项目根目录或PATH,或检查网络以允许webdriver-manager下载。错误: ' + str(e)) # 反检测 try: self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") except Exception: pass # 启用CDP Network try: self.driver.execute_cdp_cmd('Network.enable', {}) logging.info('已启用CDP Network') except Exception as e: logging.warning(f'启用CDP Network失败: {e}') def navigate(self): logging.info(f'导航到: {self.start_url}') self.driver.get(self.start_url) time.sleep(3) def ensure_login(self): """确保用户已登录并导航到收藏合集页面""" logging.info("检测登录状态和页面位置...") # 首先检查是否已经登录并在正确页面 if self._check_login_and_page(): logging.info("检测到已登录且在收藏合集页面,跳过手动确认") return # 如果未登录或不在正确页面,进行手动登录流程 logging.info("请在弹出的浏览器中手动完成登录。") if self.auto_continue: logging.info('自动继续模式,跳过手动等待...') time.sleep(5) return logging.info("进入手动登录确认循环...") while True: # 要求用户输入特定文本确认 logging.info("等待用户输入确认...") user_input = input("请在浏览器中完成登录,并导航到【我的】→【收藏】→【合集】页面。操作完成后,请在此处输入 'ok' 并按回车: ") if user_input.strip().lower() != 'ok': logging.warning("请输入 'ok' 确认您已完成登录并导航到【我的】→【收藏】→【合集】页面。") continue logging.info("用户已确认,检查当前页面...") try: current_url = self.driver.current_url logging.info(f"当前页面URL: {current_url}") if ("douyin.com/user/self" in current_url and ("favorite_collection" in current_url or "compilation" in current_url)): logging.info(f"已确认您位于收藏合集列表页面: {current_url}") logging.info("脚本将继续执行...") break else: # 用户确认了,但页面不正确,继续循环等待 logging.warning(f"检测到当前页面 ({current_url}) 并非收藏合集列表页面。请确保已导航至【我的】→【收藏】→【合集】页面。") except Exception as e: if "browser has been closed" in str(e) or "no such window" in str(e) or "target window already closed" in str(e): logging.error("浏览器窗口已关闭,脚本无法继续。") raise RuntimeError("浏览器窗口已关闭") logging.warning(f"检测URL时出错: {e}。请重试。") time.sleep(1) def _check_login_and_page(self, timeout: int = 30) -> bool: """检查是否已登录并在正确页面""" try: current_url = self.driver.current_url logging.info(f"当前页面URL: {current_url}") # 检查是否在收藏合集页面 if ("douyin.com/user/self" in current_url and ("favorite_collection" in current_url or "compilation" in current_url)): # 进一步检查登录状态 return self._detect_login_status(timeout) else: # 如果不在正确页面,尝试导航到收藏合集页面 if self._detect_login_status(timeout): logging.info("已登录但不在收藏合集页面,自动导航...") self.driver.get(self.start_url) time.sleep(3) return True return False except Exception as e: logging.warning(f"检查登录状态时出错: {e}") return False def _detect_login_status(self, timeout: int = 30) -> bool: """自动检测是否已登录""" try: start = time.time() while time.time() - start < timeout: time.sleep(2) # 检查登录状态的多个选择器 selectors = [ '[data-e2e="user-avatar"]', '.user-avatar', '[class*="avatar"]', '[class*="Avatar"]' ] for selector in selectors: try: elements = self.driver.find_elements("css selector", selector) if elements: logging.info("检测到用户头像,确认已登录") return True except Exception: continue # 检查是否有登录按钮(表示未登录) login_selectors = [ '[data-e2e="login-button"]', 'button[class*="login"]', 'a[href*="login"]' ] for selector in login_selectors: try: elements = self.driver.find_elements("css selector", selector) if elements: logging.info("检测到登录按钮,用户未登录") return False except Exception: continue logging.info("登录状态检测超时,假设未登录") return False except Exception as e: logging.warning(f"登录状态检测出错: {e}") return False def trigger_loading(self): logging.info('触发数据加载:滚动 + 刷新') # 滚动触发懒加载 for i in range(8): self.driver.execute_script(f'window.scrollTo(0, {i * 900});') time.sleep(1.2) # 刷新触发新请求 self.driver.refresh() time.sleep(4) for i in range(6): self.driver.execute_script(f'window.scrollTo(0, {i * 1200});') time.sleep(1.3) def format_count(self, n: int) -> str: if n >= 100_000_000: return f"{n/100_000_000:.1f}亿" if n >= 10_000: return f"{n/10_000:.1f}万" return str(n) def _trigger_mix_aweme_api(self, mix_id: str): """主动触发/aweme/v1/web/mix/aweme/ API调用来获取合集中的视频列表""" try: if not self.driver: logging.warning('WebDriver不可用,无法触发API调用') return logging.info(f'主动触发mix/aweme API调用,获取合集 {mix_id} 的视频列表') # 构建API URL api_url = f"https://www.douyin.com/aweme/v1/web/mix/aweme/?mix_id={mix_id}&count=20&cursor=0" # 使用JavaScript发起fetch请求并直接处理响应 js_code = f""" (async function() {{ try {{ const response = await fetch('{api_url}', {{ method: 'GET', credentials: 'include', headers: {{ 'Accept': 'application/json', 'User-Agent': navigator.userAgent }} }}); if (response.ok) {{ const data = await response.json(); console.log('Mix aweme API response for {mix_id}:', data); // 提取aweme_id列表 let awemeIds = []; if (data && data.aweme_list && Array.isArray(data.aweme_list)) {{ awemeIds = data.aweme_list.map(aweme => aweme.aweme_id).filter(id => id); }} else if (data && data.data && Array.isArray(data.data)) {{ awemeIds = data.data.map(aweme => aweme.aweme_id).filter(id => id); }} // 将结果存储到window对象中,供Python读取 if (!window.mixAwemeResults) {{ window.mixAwemeResults = {{}}; }} window.mixAwemeResults['{mix_id}'] = {{ aweme_ids: awemeIds, total_count: awemeIds.length, raw_data: data }}; console.log('Extracted aweme_ids for {mix_id}:', awemeIds); return awemeIds; }} else {{ console.error('Mix aweme API failed for {mix_id}:', response.status); return []; }} }} catch (error) {{ console.error('Mix aweme API error for {mix_id}:', error); return []; }} }})(); """ # 执行JavaScript代码 result = self.driver.execute_script(js_code) # 等待一下,然后读取结果 time.sleep(2) # 从window对象中读取结果 try: js_get_result = f""" return window.mixAwemeResults && window.mixAwemeResults['{mix_id}'] ? window.mixAwemeResults['{mix_id}'] : null; """ stored_result = self.driver.execute_script(js_get_result) if stored_result and stored_result.get('aweme_ids'): aweme_ids = stored_result['aweme_ids'] logging.info(f'成功获取合集 {mix_id} 的 {len(aweme_ids)} 个视频ID: {aweme_ids[:5]}...') # 将aweme_ids添加到类属性中 if not hasattr(self, 'collected_aweme_ids'): self.collected_aweme_ids = [] # 为这个特定的mix_id存储aweme_ids if not hasattr(self, 'mix_aweme_mapping'): self.mix_aweme_mapping = {} self.mix_aweme_mapping[mix_id] = aweme_ids # 也添加到总的collected_aweme_ids中 self.collected_aweme_ids.extend(aweme_ids) logging.info(f'已将 {len(aweme_ids)} 个视频ID添加到合集 {mix_id}') else: logging.warning(f'未能获取合集 {mix_id} 的视频ID') except Exception as e: logging.warning(f'读取JavaScript结果失败: {e}') logging.info(f'已完成mix/aweme API调用,mix_id: {mix_id}') except Exception as e: logging.warning(f'触发mix/aweme API调用失败: {e}') def parse_mix_aweme_response(self, text: str, source_url: str, request_id: str = None): """解析合集中的视频列表API响应,提取单个视频的aweme_id和播放量""" try: if not text.strip(): return # 尝试解析JSON响应 try: data = json.loads(text) except json.JSONDecodeError: logging.warning(f'mix/aweme API响应不是有效JSON: {source_url}') return # 查找aweme_list或类似的视频列表 aweme_list = None if isinstance(data, dict): # 常见的响应结构 for key in ['aweme_list', 'data', 'awemes', 'items']: if key in data and isinstance(data[key], list): aweme_list = data[key] break # 如果没有直接找到,递归查找 if aweme_list is None: aweme_list = self._find_aweme_list_recursive(data) if aweme_list and isinstance(aweme_list, list): logging.info(f'从mix/aweme API找到 {len(aweme_list)} 个视频') # 收集所有aweme_id,用于后续与合集数据关联 aweme_ids = [] for aweme in aweme_list: if isinstance(aweme, dict): aweme_id = aweme.get('aweme_id', '') if aweme_id: aweme_ids.append(aweme_id) # 获取视频标题 desc = aweme.get('desc', '') if not desc: # 尝试从其他字段获取标题 text_extra = aweme.get('text_extra', []) if text_extra and isinstance(text_extra, list): desc = ' '.join([item.get('hashtag_name', '') for item in text_extra if isinstance(item, dict)]) logging.info(f'找到视频ID: {aweme_id} - {desc[:50]}...') # 将aweme_ids存储到类属性中,供其他函数使用 if not hasattr(self, 'collected_aweme_ids'): self.collected_aweme_ids = [] self.collected_aweme_ids.extend(aweme_ids) logging.info(f'累计收集到 {len(self.collected_aweme_ids)} 个视频ID') else: logging.warning(f'mix/aweme API响应中未找到视频列表: {source_url}') except Exception as e: logging.warning(f'解析mix/aweme API响应时出错: {e}') def _find_aweme_list_recursive(self, obj, max_depth=3, current_depth=0): """递归查找aweme_list""" if current_depth >= max_depth: return None if isinstance(obj, dict): for key, value in obj.items(): if 'aweme' in key.lower() and isinstance(value, list): # 检查列表中是否包含aweme对象 if value and isinstance(value[0], dict) and 'aweme_id' in value[0]: return value if isinstance(value, (dict, list)): result = self._find_aweme_list_recursive(value, max_depth, current_depth + 1) if result: return result elif isinstance(obj, list): for item in obj: if isinstance(item, (dict, list)): result = self._find_aweme_list_recursive(item, max_depth, current_depth + 1) if result: return result return None def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None): """解析文本中的play_vv、mix_name和watched_item信息""" try: # 尝试解析JSON数据 if text.strip().startswith('{') or text.strip().startswith('['): try: data = json.loads(text) self._extract_from_json_data(data, source_url, request_id) return except json.JSONDecodeError: pass # 如果不是JSON,使用正则表达式查找 self._extract_from_text_regex(text, source_url, request_id) except Exception as e: logging.warning(f'解析文本数据时出错: {e}') def _extract_from_json_data(self, data, source_url: str, request_id: str = None): """从JSON数据中递归提取合集信息""" def extract_mix_info(obj, path=""): if isinstance(obj, dict): # 检查是否包含合集信息 if 'mix_id' in obj and 'statis' in obj: mix_id = obj.get('mix_id', '') mix_name = obj.get('mix_name', '') statis = obj.get('statis', {}) # 调试:输出包含mix_id的完整对象结构(仅输出前3个) if len(self.play_vv_items) < 3: logging.info(f"=== 调试:合集对象结构 ===") logging.info(f"完整对象键: {list(obj.keys())}") # 查找可能的视频相关字段 for key, value in obj.items(): if 'aweme' in key.lower() or 'video' in key.lower() or 'item' in key.lower() or 'ids' in key.lower(): logging.info(f"可能的视频字段 {key}: {type(value)} - {str(value)[:200]}") # 特别检查ids字段 if 'ids' in obj: ids_value = obj['ids'] logging.info(f"ids字段详细信息: {type(ids_value)} - {ids_value}") if isinstance(ids_value, list) and len(ids_value) > 0: logging.info(f"ids列表长度: {len(ids_value)}") logging.info(f"第一个ID: {ids_value[0]}") if len(ids_value) > 1: logging.info(f"第二个ID: {ids_value[1]}") if isinstance(statis, dict) and 'play_vv' in statis: play_vv = statis.get('play_vv') if isinstance(play_vv, (int, str)) and str(play_vv).isdigit(): vv = int(play_vv) # 构建合集链接 video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" # 获取该合集对应的aweme_id列表 mix_aweme_mapping = getattr(self, 'mix_aweme_mapping', {}) aweme_ids = mix_aweme_mapping.get(mix_id, []) # 提取合集封面图片URL - 直接存储完整的图片链接 cover_image_url = "" cover_image_backup_urls = [] # 备用链接列表 # 查找封面图片字段,优先获取完整的URL链接 if 'cover' in obj: cover = obj['cover'] if isinstance(cover, dict) and 'url_list' in cover and cover['url_list']: # 主链接 cover_image_url = cover['url_list'][0] # 备用链接 cover_image_backup_urls = cover['url_list'][1:] if len(cover['url_list']) > 1 else [] elif isinstance(cover, str): cover_image_url = cover elif 'cover_url' in obj: cover_url = obj['cover_url'] if isinstance(cover_url, dict) and 'url_list' in cover_url and cover_url['url_list']: cover_image_url = cover_url['url_list'][0] cover_image_backup_urls = cover_url['url_list'][1:] if len(cover_url['url_list']) > 1 else [] elif isinstance(cover_url, str): cover_image_url = cover_url elif 'image' in obj: image = obj['image'] if isinstance(image, dict) and 'url_list' in image and image['url_list']: cover_image_url = image['url_list'][0] cover_image_backup_urls = image['url_list'][1:] if len(image['url_list']) > 1 else [] elif isinstance(image, str): cover_image_url = image elif 'pic' in obj: pic = obj['pic'] if isinstance(pic, dict) and 'url_list' in pic and pic['url_list']: cover_image_url = pic['url_list'][0] cover_image_backup_urls = pic['url_list'][1:] if len(pic['url_list']) > 1 else [] elif isinstance(pic, str): cover_image_url = pic self.play_vv_items.append({ 'play_vv': vv, 'formatted': self.format_count(vv), 'url': source_url, 'request_id': request_id, 'mix_name': mix_name, 'video_url': video_url, # 合集链接 'mix_id': mix_id, # 合集ID 'aweme_ids': aweme_ids.copy() if aweme_ids else [], # 该合集包含的视频ID列表 'cover_image_url': cover_image_url, # 合集封面图片主链接(完整URL) 'cover_backup_urls': cover_image_backup_urls, # 封面图片备用链接列表 'timestamp': datetime.now().isoformat() }) logging.info(f'提取到合集: {mix_name} (ID: {mix_id}, 包含{len(aweme_ids)}个视频) - {vv:,} 播放量') # 如果aweme_ids为空,主动触发API调用获取合集中的视频列表 if not aweme_ids and mix_id: self._trigger_mix_aweme_api(mix_id) # 递归搜索子对象 for key, value in obj.items(): if isinstance(value, (dict, list)): extract_mix_info(value, f"{path}.{key}" if path else key) elif isinstance(obj, list): for i, item in enumerate(obj): if isinstance(item, (dict, list)): extract_mix_info(item, f"{path}[{i}]" if path else f"[{i}]") extract_mix_info(data) def _extract_from_text_regex(self, text: str, source_url: str, request_id: str = None): """使用正则表达式从文本中提取信息""" # 查找包含完整合集信息的JSON片段 mix_pattern = r'\{[^{}]*"mix_id"\s*:\s*"([^"]*)"[^{}]*"mix_name"\s*:\s*"([^"]*)"[^{}]*"statis"\s*:\s*\{[^{}]*"play_vv"\s*:\s*(\d+)[^{}]*\}[^{}]*\}' for match in re.finditer(mix_pattern, text): try: mix_id = match.group(1) mix_name = match.group(2) vv = int(match.group(3)) # 构建合集链接 video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" # 获取该合集对应的aweme_id列表 mix_aweme_mapping = getattr(self, 'mix_aweme_mapping', {}) aweme_ids = mix_aweme_mapping.get(mix_id, []) self.play_vv_items.append({ 'play_vv': vv, 'formatted': self.format_count(vv), 'url': source_url, 'request_id': request_id, 'mix_name': mix_name, 'video_url': video_url, # 合集链接 'mix_id': mix_id, # 合集ID 'aweme_ids': aweme_ids.copy() if aweme_ids else [], # 该合集包含的视频ID列表 'timestamp': datetime.now().isoformat() }) logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}, 包含{len(aweme_ids)}个视频) - {vv:,} 播放量') # 如果aweme_ids为空,主动触发API调用获取合集中的视频列表 if not aweme_ids and mix_id: self._trigger_mix_aweme_api(mix_id) except Exception: continue # 兜底:查找单独的play_vv值 for match in re.findall(r'"play_vv"\s*:\s*(\d+)', text): try: vv = int(match) # 检查是否已经存在相同的play_vv if not any(item['play_vv'] == vv for item in self.play_vv_items): # 获取收集到的aweme_id列表 aweme_ids = getattr(self, 'collected_aweme_ids', []) self.play_vv_items.append({ 'play_vv': vv, 'formatted': self.format_count(vv), 'url': source_url, 'request_id': request_id, 'mix_name': '', # 未知合集名称 'video_url': '', # 未知链接 'mix_id': '', # 未知mix_id 'aweme_ids': aweme_ids.copy() if aweme_ids else [], # 收集到的视频ID列表 'timestamp': datetime.now().isoformat() }) except Exception: continue def collect_network_bodies(self, duration_s: int = None): if duration_s is None: duration_s = self.duration_s logging.info(f'开始收集网络响应体,持续 {duration_s}s') start = time.time() known_request_ids = set() # 目标关键词(收藏/合集/视频) url_keywords = ['aweme', 'mix', 'collection', 'favorite', 'note', 'api'] last_progress = 0 while time.time() - start < duration_s: try: logs = self.driver.get_log('performance') except Exception as e: logging.warning(f'获取性能日志失败: {e}') time.sleep(1) continue for entry in logs: try: message = json.loads(entry['message'])['message'] except Exception: continue method = message.get('method') params = message.get('params', {}) # 记录请求URL if method == 'Network.requestWillBeSent': req_id = params.get('requestId') url = params.get('request', {}).get('url', '') if any(k in url for k in url_keywords): self.captured_responses.append({'requestId': req_id, 'url': url, 'type': 'request'}) # 响应到达,尝试获取响应体 if method == 'Network.responseReceived': req_id = params.get('requestId') url = params.get('response', {}).get('url', '') type_ = params.get('type') # XHR, Fetch, Document if req_id and req_id not in known_request_ids: known_request_ids.add(req_id) # 仅处理XHR/Fetch if type_ in ('XHR', 'Fetch') and any(k in url for k in url_keywords): try: body_obj = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': req_id}) body_text = body_obj.get('body', '') # 可能是base64编码 if body_obj.get('base64Encoded'): try: import base64 body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore') except Exception: pass # 特殊处理mix/aweme API - 获取合集中的视频列表 if '/aweme/v1/web/mix/aweme/' in url: self.parse_mix_aweme_response(body_text, url, req_id) else: # 解析play_vv self.parse_play_vv_from_text(body_text, url, req_id) except Exception: # 某些响应不可获取或过大 pass elapsed = int(time.time() - start) if elapsed - last_progress >= 5: last_progress = elapsed logging.info(f'进度: {elapsed}/{duration_s}s, 已发现play_vv候选 {len(self.play_vv_items)}') time.sleep(0.8) logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)} 个play_vv候选') # 更新所有条目的aweme_ids字段 self._update_aweme_ids_for_existing_items() def _update_aweme_ids_for_existing_items(self): """更新所有已存在条目的aweme_ids字段""" if not hasattr(self, 'mix_aweme_mapping') or not self.mix_aweme_mapping: logging.info('没有mix_aweme_mapping数据,跳过aweme_ids更新') return updated_count = 0 for item in self.play_vv_items: mix_id = item.get('mix_id') if mix_id and mix_id in self.mix_aweme_mapping: aweme_ids = self.mix_aweme_mapping[mix_id] if aweme_ids and len(aweme_ids) > 0: item['aweme_ids'] = aweme_ids.copy() updated_count += 1 logging.info(f'更新合集 {item.get("mix_name", "未知")} (ID: {mix_id}) 的aweme_ids,包含 {len(aweme_ids)} 个视频') logging.info(f'已更新 {updated_count} 个条目的aweme_ids字段') def parse_ssr_data(self): logging.info('尝试解析页面SSR数据') # 尝试直接从window对象获取 keys = ['_SSR_HYDRATED_DATA', 'RENDER_DATA'] for key in keys: try: data = self.driver.execute_script(f'return window.{key}') if data: text = json.dumps(data, ensure_ascii=False) self.parse_play_vv_from_text(text, f'page_{key}', None) logging.info(f'从 {key} 中解析完成') except Exception: continue # 兜底:从page_source中正则查找 try: page_source = self.driver.page_source self.parse_play_vv_from_text(page_source, 'page_source', None) # 同时尝试识别statis结构中的play_vv for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source): try: vv = int(m) # 检查是否已经存在相同的play_vv if not any(item['play_vv'] == vv for item in self.play_vv_items): self.play_vv_items.append({ 'play_vv': vv, 'formatted': self.format_count(vv), 'url': 'page_source_statis', 'request_id': None, 'mix_name': '', # 从statis中无法获取合集名称 'video_url': '', # 从statis中无法获取链接 'timestamp': datetime.now().isoformat() }) except Exception: pass except Exception: pass def dedupe(self): # 去重按play_vv数值 unique = [] seen = set() for item in self.play_vv_items: vv = item['play_vv'] if vv not in seen: unique.append(item) seen.add(vv) self.play_vv_items = unique def save_results(self): ts = datetime.now().strftime('%Y%m%d_%H%M%S') # 创建data文件夹 script_dir = os.path.dirname(os.path.abspath(__file__)) data_dir = os.path.join(script_dir, 'data') os.makedirs(data_dir, exist_ok=True) json_file = os.path.join(data_dir, f'douyin_cdp_play_vv_{ts}.json') txt_file = os.path.join(data_dir, f'douyin_cdp_play_vv_{ts}.txt') # 保存到JSON文件 with open(json_file, 'w', encoding='utf-8') as f: json.dump({ 'timestamp': ts, 'start_url': self.start_url, 'play_vv_items': self.play_vv_items, 'captured_count': len(self.play_vv_items) }, f, ensure_ascii=False, indent=2) # 保存到TXT文件 with open(txt_file, 'w', encoding='utf-8') as f: f.write('抖音收藏合集真实播放量(play_vv) - Selenium+CDP\n') f.write('=' * 60 + '\n\n') if self.play_vv_items: sorted_items = sorted(self.play_vv_items, key=lambda x: x['play_vv'], reverse=True) f.write(f"✅ 提取到 {len(sorted_items)} 个唯一play_vv数值\n\n") for i, item in enumerate(sorted_items, 1): mix_info = f" - {item.get('mix_name', '未知合集')}" if item.get('mix_name') else "" video_info = f" (链接: {item.get('video_url', '未知')})" if item.get('video_url') else "" f.write(f"{i}. play_vv: {item['play_vv']:,} ({item['formatted']}){mix_info}{video_info}\n") f.write(f" 来源: {item['url']}\n\n") total = sum(x['play_vv'] for x in sorted_items) f.write(f"📊 总播放量: {total:,}\n") f.write(f"📈 最高播放量: {sorted_items[0]['play_vv']:,} ({sorted_items[0]['formatted']})\n") else: f.write('❌ 未能提取到play_vv数值\n') f.write('可能原因:\n') f.write('- 仍需登录或权限受限\n') f.write('- API响应体不可读取或被加密\n') f.write('- 页面结构或接口策略发生变更\n') # 保存到MongoDB self.save_to_mongodb() logging.info('结果已保存: %s, %s', json_file, txt_file) def save_to_mongodb(self): """将数据保存到MongoDB""" if self.mongo_client is None or self.collection is None: logging.warning('MongoDB未连接,跳过数据库保存') return if not self.play_vv_items: logging.info('没有数据需要保存到MongoDB') return try: batch_time = datetime.now() documents = [] for item in self.play_vv_items: # 保留用户要求的7个字段 + aweme_ids作为短剧集数ID列表 + cover_image_url作为合集封面图片完整链接 doc = { 'batch_time': batch_time, 'mix_name': item.get('mix_name', ''), 'video_url': item.get('video_url', ''), 'playcount': item.get('formatted', ''), 'play_vv': item.get('play_vv', 0), 'request_id': item.get('request_id', ''), 'rank': 0, # 临时设置,后面会重新计算 'aweme_ids': item.get('aweme_ids', []), # 短剧集数ID列表 'cover_image_url': item.get('cover_image_url', ''), # 合集封面图片主链接(完整URL) 'cover_backup_urls': item.get('cover_backup_urls', []) # 封面图片备用链接列表 } documents.append(doc) # 按播放量降序排序并添加排名 documents.sort(key=lambda x: x['play_vv'], reverse=True) for i, doc in enumerate(documents, 1): doc['rank'] = i # 批量插入 result = self.collection.insert_many(documents) logging.info(f'成功保存 {len(result.inserted_ids)} 条记录到MongoDB') # 输出统计信息 total_play_vv = sum(doc['play_vv'] for doc in documents) max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0 logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}') logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, aweme_ids, cover_image_url, cover_backup_urls') # 统计封面图片提取情况 cover_count = sum(1 for doc in documents if doc.get('cover_image_url')) backup_count = sum(1 for doc in documents if doc.get('cover_backup_urls')) logging.info(f'封面图片统计: {cover_count}/{len(documents)} 个合集有主封面链接, {backup_count} 个合集有备用链接') # 输出aweme_ids统计信息 total_episodes = sum(len(doc.get('aweme_ids', [])) for doc in documents) logging.info(f'短剧集数统计: 总共收集到 {total_episodes} 集视频ID') except Exception as e: logging.error(f'保存到MongoDB时出错: {e}') def run(self): try: self.setup_driver() self.navigate() self.ensure_login() self.trigger_loading() self.collect_network_bodies() self.parse_ssr_data() self.dedupe() self.save_results() logging.info('完成,play_vv数量: %d', len(self.play_vv_items)) finally: if self.driver: try: self.driver.quit() except Exception: pass if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Selenium+CDP 抖音play_vv抓取器') parser.add_argument('--url', default='https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation', help='收藏合集列表页面URL') parser.add_argument('--auto', action='store_true', help='自动继续,跳过回车等待') parser.add_argument('--duration', type=int, default=60, help='网络响应收集时长(秒)') parser.add_argument('--driver', help='覆盖chromedriver路径') args = parser.parse_args() if args.driver: os.environ['OVERRIDE_CHROMEDRIVER'] = args.driver if args.auto: os.environ['AUTO_CONTINUE'] = '1' print('=== Selenium+CDP 抖音play_vv抓取器 ===') print('将复用本地Chrome配置并抓取网络响应中的play_vv') scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration) scraper.run()