802 lines
37 KiB
Python
802 lines
37 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Selenium + Chrome DevTools Protocol 抓取抖音收藏合集真实播放量(play_vv)
|
||
|
||
核心能力:
|
||
- 启用CDP网络事件,获取响应体并解析play_vv
|
||
- 复用本地Chrome用户数据,绕过登录障碍
|
||
- 自动滚动与刷新触发更多API请求
|
||
- 同时解析页面中的SSR数据(window._SSR_HYDRATED_DATA/RENDER_DATA)
|
||
|
||
使用方法:
|
||
1) 默认复用 `config/chrome_profile` 下的已登录Chrome配置。
|
||
2) 若仍需登录,请在弹出的Chrome中完成登录后回到终端按回车。
|
||
3) 程序会滚动和刷新,自动收集网络数据并提取play_vv。
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
import subprocess
|
||
import time
|
||
import logging
|
||
import os
|
||
import shutil
|
||
from datetime import datetime
|
||
|
||
from selenium import webdriver
|
||
import os
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.chrome.options import Options
|
||
# 保留导入但默认不使用webdriver_manager,避免网络下载卡顿
|
||
from webdriver_manager.chrome import ChromeDriverManager # noqa: F401
|
||
import chromedriver_autoinstaller
|
||
from pymongo import MongoClient
|
||
from pymongo.errors import ConnectionFailure
|
||
|
||
|
||
# 配置日志
|
||
# 确保logs目录存在
|
||
import os
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
logs_dir = os.path.join(script_dir, 'logs')
|
||
os.makedirs(logs_dir, exist_ok=True)
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='[%(levelname)s] %(message)s',
|
||
handlers=[
|
||
logging.FileHandler(os.path.join(logs_dir, 'douyin_scraper.log'), encoding='utf-8'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
|
||
|
||
class DouyinPlayVVScraper:
|
||
def __init__(self, start_url: str = None, auto_continue: bool = False, duration_s: int = 60):
|
||
self.start_url = start_url or "https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation"
|
||
self.auto_continue = auto_continue
|
||
self.duration_s = duration_s
|
||
self.driver = None
|
||
self.play_vv_items = [] # list of dicts: {play_vv, formatted, url, request_id, mix_name, watched_item}
|
||
self.captured_responses = []
|
||
self.mongo_client = None
|
||
self.db = None
|
||
self.collection = None
|
||
self._cleanup_old_profiles()
|
||
self._setup_mongodb()
|
||
|
||
def _setup_mongodb(self):
|
||
"""设置MongoDB连接"""
|
||
try:
|
||
# MongoDB连接配置
|
||
mongo_host = os.environ.get('MONGO_HOST', 'localhost')
|
||
mongo_port = int(os.environ.get('MONGO_PORT', 27017))
|
||
mongo_db = os.environ.get('MONGO_DB', 'Rankings')
|
||
mongo_collection = os.environ.get('MONGO_COLLECTION', 'Rankings_list')
|
||
|
||
# 创建MongoDB连接
|
||
self.mongo_client = MongoClient(mongo_host, mongo_port, serverSelectionTimeoutMS=5000)
|
||
|
||
# 测试连接
|
||
self.mongo_client.admin.command('ping')
|
||
|
||
# 设置数据库和集合
|
||
self.db = self.mongo_client[mongo_db]
|
||
self.collection = self.db[mongo_collection]
|
||
|
||
logging.info(f'MongoDB连接成功: {mongo_host}:{mongo_port}/{mongo_db}.{mongo_collection}')
|
||
|
||
except ConnectionFailure as e:
|
||
logging.warning(f'MongoDB连接失败: {e}')
|
||
logging.info('将仅保存到本地文件')
|
||
self.mongo_client = None
|
||
self.db = None
|
||
self.collection = None
|
||
except Exception as e:
|
||
logging.warning(f'MongoDB设置出错: {e}')
|
||
self.mongo_client = None
|
||
self.db = None
|
||
self.collection = None
|
||
|
||
def _cleanup_old_profiles(self):
|
||
"""清理超过一天的旧临时Chrome配置文件"""
|
||
try:
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
profile_base_dir = os.path.join(script_dir, 'config', 'chrome_profile')
|
||
if not os.path.exists(profile_base_dir):
|
||
return
|
||
|
||
current_time = time.time()
|
||
one_day_ago = current_time - 24 * 60 * 60 # 24小时前
|
||
|
||
for item in os.listdir(profile_base_dir):
|
||
if item.startswith('run_'):
|
||
item_path = os.path.join(profile_base_dir, item)
|
||
if os.path.isdir(item_path):
|
||
try:
|
||
# 提取时间戳
|
||
timestamp = int(item.split('_')[1])
|
||
if timestamp < one_day_ago:
|
||
shutil.rmtree(item_path, ignore_errors=True)
|
||
logging.info(f'清理旧配置文件: {item}')
|
||
except (ValueError, IndexError):
|
||
# 如果无法解析时间戳,跳过
|
||
continue
|
||
except Exception as e:
|
||
logging.warning(f'清理旧配置文件时出错: {e}')
|
||
|
||
def _cleanup_chrome_processes(self):
|
||
"""清理可能占用配置文件的Chrome进程"""
|
||
try:
|
||
import subprocess
|
||
import psutil
|
||
|
||
# 获取当前配置文件路径
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent')
|
||
|
||
# 查找使用该配置文件的Chrome进程
|
||
killed_processes = []
|
||
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||
try:
|
||
if proc.info['name'] and 'chrome' in proc.info['name'].lower():
|
||
cmdline = proc.info['cmdline']
|
||
if cmdline and any(profile_dir in arg for arg in cmdline):
|
||
proc.terminate()
|
||
killed_processes.append(proc.info['pid'])
|
||
logging.info(f'终止占用配置文件的Chrome进程: PID {proc.info["pid"]}')
|
||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||
continue
|
||
|
||
# 等待进程终止
|
||
if killed_processes:
|
||
time.sleep(2)
|
||
|
||
return len(killed_processes) > 0
|
||
|
||
except ImportError:
|
||
# 如果没有psutil,使用系统命令
|
||
try:
|
||
result = subprocess.run(['taskkill', '/f', '/im', 'chrome.exe'],
|
||
capture_output=True, text=True, timeout=10)
|
||
if result.returncode == 0:
|
||
logging.info('使用taskkill清理Chrome进程')
|
||
time.sleep(2)
|
||
return True
|
||
except Exception as e:
|
||
logging.warning(f'清理Chrome进程失败: {e}')
|
||
return False
|
||
except Exception as e:
|
||
logging.warning(f'清理Chrome进程时出错: {e}')
|
||
return False
|
||
|
||
def setup_driver(self):
|
||
logging.info('初始化Chrome WebDriver (启用CDP网络日志)')
|
||
|
||
# 清理可能占用配置文件的Chrome进程
|
||
self._cleanup_chrome_processes()
|
||
|
||
chrome_options = Options()
|
||
chrome_options.add_argument('--no-sandbox')
|
||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
chrome_options.add_argument('--disable-extensions')
|
||
chrome_options.add_argument('--remote-allow-origins=*')
|
||
chrome_options.add_argument('--remote-debugging-port=0')
|
||
chrome_options.add_argument('--start-maximized')
|
||
chrome_options.add_argument('--lang=zh-CN')
|
||
# 使用固定的Chrome配置文件目录以保持登录状态
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent')
|
||
os.makedirs(profile_dir, exist_ok=True)
|
||
chrome_options.add_argument(f'--user-data-dir={profile_dir}')
|
||
logging.info(f'使用持久化Chrome配置文件: {profile_dir}')
|
||
# 明确设置Chrome二进制路径(32位Chrome常见安装位置)
|
||
possible_chrome_bins = [
|
||
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
|
||
r"C:\Program Files\Google\Chrome\Application\chrome.exe"
|
||
]
|
||
for bin_path in possible_chrome_bins:
|
||
if os.path.exists(bin_path):
|
||
chrome_options.binary_location = bin_path
|
||
logging.info(f'使用Chrome二进制路径: {bin_path}')
|
||
break
|
||
# 性能日志(Network事件)
|
||
chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})
|
||
|
||
# 仅使用本地或PATH中的chromedriver,避免网络下载依赖
|
||
driver_ready = False
|
||
candidates = []
|
||
# 可通过环境变量强制覆盖驱动路径
|
||
env_override = os.environ.get('OVERRIDE_CHROMEDRIVER')
|
||
if env_override:
|
||
candidates.append(env_override)
|
||
logging.info(f'检测到环境变量 OVERRIDE_CHROMEDRIVER,优先使用: {env_override}')
|
||
# 脚本所在目录的drivers路径(优先)
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
script_driver_path = os.path.join(script_dir, 'drivers', 'chromedriver.exe')
|
||
candidates.append(script_driver_path)
|
||
logging.info(f'优先尝试脚本目录路径: {script_driver_path}')
|
||
|
||
# 项目根目录的drivers路径
|
||
user_driver_path = os.path.join(os.getcwd(), 'drivers', 'chromedriver.exe')
|
||
candidates.append(user_driver_path)
|
||
logging.info(f'尝试项目根目录路径: {user_driver_path}')
|
||
|
||
# 项目根目录
|
||
candidates.append(os.path.join(os.getcwd(), 'chromedriver.exe'))
|
||
# 其他可能目录
|
||
candidates.append(os.path.join(os.getcwd(), 'drivers', 'chromedriver'))
|
||
# PATH 中的chromedriver
|
||
which_path = shutil.which('chromedriver')
|
||
if which_path:
|
||
candidates.append(which_path)
|
||
|
||
if not driver_ready:
|
||
for p in candidates:
|
||
try:
|
||
if p and os.path.exists(p):
|
||
logging.info(f'尝试使用chromedriver: {p}')
|
||
service = Service(p)
|
||
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
driver_ready = True
|
||
logging.info(f'使用chromedriver启动成功: {p}')
|
||
try:
|
||
caps = self.driver.capabilities
|
||
browser_ver = caps.get('browserVersion') or caps.get('version')
|
||
cdver = caps.get('chrome', {}).get('chromedriverVersion')
|
||
logging.info(f'Chrome版本: {browser_ver}, ChromeDriver版本: {cdver}')
|
||
except Exception:
|
||
pass
|
||
break
|
||
else:
|
||
logging.info(f'候选路径不存在: {p}')
|
||
except Exception as e:
|
||
logging.warning(f'尝试使用 {p} 启动失败: {e}')
|
||
|
||
if not driver_ready:
|
||
# 最终回退:使用webdriver-manager(可能需要网络)
|
||
try:
|
||
service = Service(ChromeDriverManager().install())
|
||
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
driver_ready = True
|
||
logging.info('使用webdriver-manager成功启动ChromeDriver')
|
||
except Exception as e:
|
||
raise RuntimeError('未能启动ChromeDriver。请手动下载匹配版本的chromedriver到项目根目录或PATH,或检查网络以允许webdriver-manager下载。错误: ' + str(e))
|
||
|
||
# 反检测
|
||
try:
|
||
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||
except Exception:
|
||
pass
|
||
|
||
# 启用CDP Network
|
||
try:
|
||
self.driver.execute_cdp_cmd('Network.enable', {})
|
||
logging.info('已启用CDP Network')
|
||
except Exception as e:
|
||
logging.warning(f'启用CDP Network失败: {e}')
|
||
|
||
def navigate(self):
|
||
logging.info(f'导航到: {self.start_url}')
|
||
self.driver.get(self.start_url)
|
||
time.sleep(8) # 增加页面加载等待时间
|
||
|
||
def ensure_login(self):
|
||
"""确保用户已登录并导航到收藏合集页面"""
|
||
logging.info("检测登录状态和页面位置...")
|
||
|
||
# 首先检查是否已经登录并在正确页面
|
||
if self._check_login_and_page():
|
||
logging.info("检测到已登录且在收藏合集页面,跳过手动确认")
|
||
return
|
||
|
||
# 如果未登录或不在正确页面,进行手动登录流程
|
||
logging.info("请在弹出的浏览器中手动完成登录。")
|
||
|
||
if self.auto_continue:
|
||
logging.info('自动继续模式,跳过手动等待...')
|
||
time.sleep(5)
|
||
return
|
||
|
||
logging.info("进入手动登录确认循环...")
|
||
while True:
|
||
# 要求用户输入特定文本确认
|
||
logging.info("等待用户输入确认...")
|
||
user_input = input("请在浏览器中完成登录,并导航到【我的】→【收藏】→【合集】页面。操作完成后,请在此处输入 'ok' 并按回车: ")
|
||
|
||
if user_input.strip().lower() != 'ok':
|
||
logging.warning("请输入 'ok' 确认您已完成登录并导航到【我的】→【收藏】→【合集】页面。")
|
||
continue
|
||
|
||
logging.info("用户已确认,检查当前页面...")
|
||
|
||
try:
|
||
current_url = self.driver.current_url
|
||
logging.info(f"当前页面URL: {current_url}")
|
||
if ("douyin.com/user/self" in current_url and
|
||
("favorite_collection" in current_url or "compilation" in current_url)):
|
||
logging.info(f"已确认您位于收藏合集列表页面: {current_url}")
|
||
logging.info("脚本将继续执行...")
|
||
break
|
||
else:
|
||
# 用户确认了,但页面不正确,继续循环等待
|
||
logging.warning(f"检测到当前页面 ({current_url}) 并非收藏合集列表页面。请确保已导航至【我的】→【收藏】→【合集】页面。")
|
||
|
||
except Exception as e:
|
||
if "browser has been closed" in str(e) or "no such window" in str(e) or "target window already closed" in str(e):
|
||
logging.error("浏览器窗口已关闭,脚本无法继续。")
|
||
raise RuntimeError("浏览器窗口已关闭")
|
||
logging.warning(f"检测URL时出错: {e}。请重试。")
|
||
time.sleep(1)
|
||
|
||
def _check_login_and_page(self, timeout: int = 600) -> bool:
|
||
"""检查是否已登录并在正确页面"""
|
||
try:
|
||
current_url = self.driver.current_url
|
||
logging.info(f"当前页面URL: {current_url}")
|
||
|
||
# 检查是否在收藏合集页面
|
||
if ("douyin.com/user/self" in current_url and
|
||
("favorite_collection" in current_url or "compilation" in current_url)):
|
||
# 进一步检查登录状态
|
||
return self._detect_login_status(timeout)
|
||
else:
|
||
# 如果不在正确页面,尝试导航到收藏合集页面
|
||
if self._detect_login_status(timeout):
|
||
logging.info("已登录但不在收藏合集页面,自动导航...")
|
||
self.driver.get(self.start_url)
|
||
time.sleep(3)
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
logging.warning(f"检查登录状态时出错: {e}")
|
||
return False
|
||
|
||
def _detect_login_status(self, timeout: int = 600) -> bool:
|
||
"""自动检测是否已登录"""
|
||
try:
|
||
start = time.time()
|
||
while time.time() - start < timeout:
|
||
time.sleep(2)
|
||
# 检查登录状态的多个选择器
|
||
selectors = [
|
||
'[data-e2e="user-avatar"]',
|
||
'.user-avatar',
|
||
'[class*="avatar"]',
|
||
'[class*="Avatar"]'
|
||
]
|
||
|
||
for selector in selectors:
|
||
try:
|
||
elements = self.driver.find_elements("css selector", selector)
|
||
if elements:
|
||
logging.info("检测到用户头像,确认已登录")
|
||
return True
|
||
except Exception:
|
||
continue
|
||
|
||
# 检查是否有登录按钮(表示未登录)
|
||
login_selectors = [
|
||
'[data-e2e="login-button"]',
|
||
'button[class*="login"]',
|
||
'a[href*="login"]'
|
||
]
|
||
|
||
for selector in login_selectors:
|
||
try:
|
||
elements = self.driver.find_elements("css selector", selector)
|
||
if elements:
|
||
logging.info("检测到登录按钮,用户未登录")
|
||
return False
|
||
except Exception:
|
||
continue
|
||
|
||
logging.info("登录状态检测超时,假设未登录")
|
||
return False
|
||
except Exception as e:
|
||
logging.warning(f"登录状态检测出错: {e}")
|
||
return False
|
||
|
||
def trigger_loading(self):
|
||
logging.info('触发数据加载:滚动 + 刷新')
|
||
# 滚动触发懒加载
|
||
for i in range(8):
|
||
self.driver.execute_script(f'window.scrollTo(0, {i * 900});')
|
||
time.sleep(1.2)
|
||
# 刷新触发新请求
|
||
self.driver.refresh()
|
||
time.sleep(4)
|
||
for i in range(6):
|
||
self.driver.execute_script(f'window.scrollTo(0, {i * 1200});')
|
||
time.sleep(1.3)
|
||
|
||
def format_count(self, n: int) -> str:
|
||
if n >= 100_000_000:
|
||
return f"{n/100_000_000:.1f}亿"
|
||
if n >= 10_000:
|
||
return f"{n/10_000:.1f}万"
|
||
return str(n)
|
||
|
||
|
||
|
||
def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None):
|
||
"""解析文本中的play_vv、mix_name和watched_item信息"""
|
||
try:
|
||
# 尝试解析JSON数据
|
||
if text.strip().startswith('{') or text.strip().startswith('['):
|
||
try:
|
||
data = json.loads(text)
|
||
self._extract_from_json_data(data, source_url, request_id)
|
||
return
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# 如果不是JSON,使用正则表达式查找
|
||
self._extract_from_text_regex(text, source_url, request_id)
|
||
|
||
except Exception as e:
|
||
logging.warning(f'解析文本数据时出错: {e}')
|
||
|
||
def _extract_from_json_data(self, data, source_url: str, request_id: str = None):
|
||
"""从JSON数据中递归提取合集信息"""
|
||
def extract_mix_info(obj, path=""):
|
||
if isinstance(obj, dict):
|
||
# 检查是否包含合集信息
|
||
if 'mix_id' in obj and 'statis' in obj:
|
||
mix_id = obj.get('mix_id', '')
|
||
mix_name = obj.get('mix_name', '')
|
||
statis = obj.get('statis', {})
|
||
|
||
# 调试:输出包含mix_id的完整对象结构(仅输出前3个)
|
||
if len(self.play_vv_items) < 3:
|
||
logging.info(f"=== 调试:合集对象结构 ===")
|
||
logging.info(f"完整对象键: {list(obj.keys())}")
|
||
# 查找可能的视频相关字段
|
||
for key, value in obj.items():
|
||
if 'aweme' in key.lower() or 'video' in key.lower() or 'item' in key.lower() or 'ids' in key.lower():
|
||
logging.info(f"可能的视频字段 {key}: {type(value)} - {str(value)[:200]}")
|
||
|
||
# 特别检查ids字段
|
||
if 'ids' in obj:
|
||
ids_value = obj['ids']
|
||
logging.info(f"ids字段详细信息: {type(ids_value)} - {ids_value}")
|
||
if isinstance(ids_value, list) and len(ids_value) > 0:
|
||
logging.info(f"ids列表长度: {len(ids_value)}")
|
||
logging.info(f"第一个ID: {ids_value[0]}")
|
||
if len(ids_value) > 1:
|
||
logging.info(f"第二个ID: {ids_value[1]}")
|
||
|
||
if isinstance(statis, dict) and 'play_vv' in statis:
|
||
play_vv = statis.get('play_vv')
|
||
if isinstance(play_vv, (int, str)) and str(play_vv).isdigit():
|
||
vv = int(play_vv)
|
||
# 构建合集链接
|
||
video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else ""
|
||
|
||
# 提取合集封面图片URL - 直接存储完整的图片链接
|
||
cover_image_url = ""
|
||
cover_image_backup_urls = [] # 备用链接列表
|
||
|
||
# 查找封面图片字段,优先获取完整的URL链接
|
||
if 'cover' in obj:
|
||
cover = obj['cover']
|
||
if isinstance(cover, dict) and 'url_list' in cover and cover['url_list']:
|
||
# 主链接
|
||
cover_image_url = cover['url_list'][0]
|
||
# 备用链接
|
||
cover_image_backup_urls = cover['url_list'][1:] if len(cover['url_list']) > 1 else []
|
||
elif isinstance(cover, str):
|
||
cover_image_url = cover
|
||
elif 'cover_url' in obj:
|
||
cover_url = obj['cover_url']
|
||
if isinstance(cover_url, dict) and 'url_list' in cover_url and cover_url['url_list']:
|
||
cover_image_url = cover_url['url_list'][0]
|
||
cover_image_backup_urls = cover_url['url_list'][1:] if len(cover_url['url_list']) > 1 else []
|
||
elif isinstance(cover_url, str):
|
||
cover_image_url = cover_url
|
||
elif 'image' in obj:
|
||
image = obj['image']
|
||
if isinstance(image, dict) and 'url_list' in image and image['url_list']:
|
||
cover_image_url = image['url_list'][0]
|
||
cover_image_backup_urls = image['url_list'][1:] if len(image['url_list']) > 1 else []
|
||
elif isinstance(image, str):
|
||
cover_image_url = image
|
||
elif 'pic' in obj:
|
||
pic = obj['pic']
|
||
if isinstance(pic, dict) and 'url_list' in pic and pic['url_list']:
|
||
cover_image_url = pic['url_list'][0]
|
||
cover_image_backup_urls = pic['url_list'][1:] if len(pic['url_list']) > 1 else []
|
||
elif isinstance(pic, str):
|
||
cover_image_url = pic
|
||
|
||
self.play_vv_items.append({
|
||
'play_vv': vv,
|
||
'formatted': self.format_count(vv),
|
||
'url': source_url,
|
||
'request_id': request_id,
|
||
'mix_name': mix_name,
|
||
'video_url': video_url, # 合集链接
|
||
'mix_id': mix_id, # 合集ID
|
||
'cover_image_url': cover_image_url, # 合集封面图片主链接(完整URL)
|
||
'cover_backup_urls': cover_image_backup_urls, # 封面图片备用链接列表
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
logging.info(f'提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量')
|
||
|
||
# 递归搜索子对象
|
||
for key, value in obj.items():
|
||
if isinstance(value, (dict, list)):
|
||
extract_mix_info(value, f"{path}.{key}" if path else key)
|
||
|
||
elif isinstance(obj, list):
|
||
for i, item in enumerate(obj):
|
||
if isinstance(item, (dict, list)):
|
||
extract_mix_info(item, f"{path}[{i}]" if path else f"[{i}]")
|
||
|
||
extract_mix_info(data)
|
||
|
||
def _extract_from_text_regex(self, text: str, source_url: str, request_id: str = None):
|
||
"""使用正则表达式从文本中提取信息"""
|
||
# 查找包含完整合集信息的JSON片段
|
||
mix_pattern = r'\{[^{}]*"mix_id"\s*:\s*"([^"]*)"[^{}]*"mix_name"\s*:\s*"([^"]*)"[^{}]*"statis"\s*:\s*\{[^{}]*"play_vv"\s*:\s*(\d+)[^{}]*\}[^{}]*\}'
|
||
|
||
for match in re.finditer(mix_pattern, text):
|
||
try:
|
||
mix_id = match.group(1)
|
||
mix_name = match.group(2)
|
||
vv = int(match.group(3))
|
||
|
||
# 构建合集链接
|
||
video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else ""
|
||
|
||
self.play_vv_items.append({
|
||
'play_vv': vv,
|
||
'formatted': self.format_count(vv),
|
||
'url': source_url,
|
||
'request_id': request_id,
|
||
'mix_name': mix_name,
|
||
'video_url': video_url, # 合集链接
|
||
'mix_id': mix_id, # 合集ID
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量')
|
||
except Exception:
|
||
continue
|
||
|
||
# 兜底:查找单独的play_vv值
|
||
for match in re.findall(r'"play_vv"\s*:\s*(\d+)', text):
|
||
try:
|
||
vv = int(match)
|
||
# 检查是否已经存在相同的play_vv
|
||
if not any(item['play_vv'] == vv for item in self.play_vv_items):
|
||
self.play_vv_items.append({
|
||
'play_vv': vv,
|
||
'formatted': self.format_count(vv),
|
||
'url': source_url,
|
||
'request_id': request_id,
|
||
'mix_name': '', # 未知合集名称
|
||
'video_url': '', # 未知链接
|
||
'mix_id': '', # 未知mix_id
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
except Exception:
|
||
continue
|
||
|
||
def collect_network_bodies(self, duration_s: int = None):
|
||
if duration_s is None:
|
||
duration_s = self.duration_s
|
||
logging.info(f'开始收集网络响应体,持续 {duration_s}s')
|
||
start = time.time()
|
||
known_request_ids = set()
|
||
|
||
# 目标关键词(收藏/合集/视频)
|
||
url_keywords = ['aweme', 'mix', 'collection', 'favorite', 'note', 'api']
|
||
|
||
last_progress = 0
|
||
while time.time() - start < duration_s:
|
||
try:
|
||
logs = self.driver.get_log('performance')
|
||
except Exception as e:
|
||
logging.warning(f'获取性能日志失败: {e}')
|
||
time.sleep(1)
|
||
continue
|
||
|
||
for entry in logs:
|
||
try:
|
||
message = json.loads(entry['message'])['message']
|
||
except Exception:
|
||
continue
|
||
|
||
method = message.get('method')
|
||
params = message.get('params', {})
|
||
|
||
# 记录请求URL
|
||
if method == 'Network.requestWillBeSent':
|
||
req_id = params.get('requestId')
|
||
url = params.get('request', {}).get('url', '')
|
||
if any(k in url for k in url_keywords):
|
||
self.captured_responses.append({'requestId': req_id, 'url': url, 'type': 'request'})
|
||
|
||
# 响应到达,尝试获取响应体
|
||
if method == 'Network.responseReceived':
|
||
req_id = params.get('requestId')
|
||
url = params.get('response', {}).get('url', '')
|
||
type_ = params.get('type') # XHR, Fetch, Document
|
||
if req_id and req_id not in known_request_ids:
|
||
known_request_ids.add(req_id)
|
||
# 仅处理XHR/Fetch
|
||
if type_ in ('XHR', 'Fetch') and any(k in url for k in url_keywords):
|
||
try:
|
||
body_obj = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': req_id})
|
||
body_text = body_obj.get('body', '')
|
||
# 可能是base64编码
|
||
if body_obj.get('base64Encoded'):
|
||
try:
|
||
import base64
|
||
body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore')
|
||
except Exception:
|
||
pass
|
||
|
||
# 解析play_vv
|
||
self.parse_play_vv_from_text(body_text, url, req_id)
|
||
except Exception:
|
||
# 某些响应不可获取或过大
|
||
pass
|
||
elapsed = int(time.time() - start)
|
||
if elapsed - last_progress >= 5:
|
||
last_progress = elapsed
|
||
logging.info(f'进度: {elapsed}/{duration_s}s, 已发现play_vv候选 {len(self.play_vv_items)}')
|
||
time.sleep(0.8)
|
||
|
||
logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)} 个play_vv候选')
|
||
|
||
|
||
def parse_ssr_data(self):
|
||
logging.info('尝试解析页面SSR数据')
|
||
# 尝试直接从window对象获取
|
||
keys = ['_SSR_HYDRATED_DATA', 'RENDER_DATA']
|
||
for key in keys:
|
||
try:
|
||
data = self.driver.execute_script(f'return window.{key}')
|
||
if data:
|
||
text = json.dumps(data, ensure_ascii=False)
|
||
self.parse_play_vv_from_text(text, f'page_{key}', None)
|
||
logging.info(f'从 {key} 中解析完成')
|
||
except Exception:
|
||
continue
|
||
|
||
# 兜底:从page_source中正则查找
|
||
try:
|
||
page_source = self.driver.page_source
|
||
self.parse_play_vv_from_text(page_source, 'page_source', None)
|
||
# 同时尝试识别statis结构中的play_vv
|
||
for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source):
|
||
try:
|
||
vv = int(m)
|
||
# 检查是否已经存在相同的play_vv
|
||
if not any(item['play_vv'] == vv for item in self.play_vv_items):
|
||
self.play_vv_items.append({
|
||
'play_vv': vv,
|
||
'formatted': self.format_count(vv),
|
||
'url': 'page_source_statis',
|
||
'request_id': None,
|
||
'mix_name': '', # 从statis中无法获取合集名称
|
||
'video_url': '', # 从statis中无法获取链接
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
def dedupe(self):
|
||
# 去重按play_vv数值
|
||
unique = []
|
||
seen = set()
|
||
for item in self.play_vv_items:
|
||
vv = item['play_vv']
|
||
if vv not in seen:
|
||
unique.append(item)
|
||
seen.add(vv)
|
||
self.play_vv_items = unique
|
||
|
||
def save_results(self):
|
||
# 保存到MongoDB
|
||
self.save_to_mongodb()
|
||
|
||
logging.info('结果已保存到MongoDB')
|
||
|
||
def save_to_mongodb(self):
|
||
"""将数据保存到MongoDB"""
|
||
if self.mongo_client is None or self.collection is None:
|
||
logging.warning('MongoDB未连接,跳过数据库保存')
|
||
return
|
||
|
||
if not self.play_vv_items:
|
||
logging.info('没有数据需要保存到MongoDB')
|
||
return
|
||
|
||
try:
|
||
batch_time = datetime.now()
|
||
documents = []
|
||
|
||
for item in self.play_vv_items:
|
||
# 保留用户要求的7个字段 + cover_image_url作为合集封面图片完整链接
|
||
doc = {
|
||
'batch_time': batch_time,
|
||
'mix_name': item.get('mix_name', ''),
|
||
'video_url': item.get('video_url', ''),
|
||
'playcount': item.get('formatted', ''),
|
||
'play_vv': item.get('play_vv', 0),
|
||
'request_id': item.get('request_id', ''),
|
||
'rank': 0, # 临时设置,后面会重新计算
|
||
'cover_image_url': item.get('cover_image_url', ''), # 合集封面图片主链接(完整URL)
|
||
'cover_backup_urls': item.get('cover_backup_urls', []) # 封面图片备用链接列表
|
||
}
|
||
documents.append(doc)
|
||
|
||
# 按播放量降序排序并添加排名
|
||
documents.sort(key=lambda x: x['play_vv'], reverse=True)
|
||
for i, doc in enumerate(documents, 1):
|
||
doc['rank'] = i
|
||
|
||
# 批量插入
|
||
result = self.collection.insert_many(documents)
|
||
logging.info(f'成功保存 {len(result.inserted_ids)} 条记录到MongoDB')
|
||
|
||
# 输出统计信息
|
||
total_play_vv = sum(doc['play_vv'] for doc in documents)
|
||
max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0
|
||
|
||
logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}')
|
||
logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url, cover_backup_urls')
|
||
|
||
# 统计封面图片提取情况
|
||
cover_count = sum(1 for doc in documents if doc.get('cover_image_url'))
|
||
backup_count = sum(1 for doc in documents if doc.get('cover_backup_urls'))
|
||
logging.info(f'封面图片统计: {cover_count}/{len(documents)} 个合集有主封面链接, {backup_count} 个合集有备用链接')
|
||
|
||
except Exception as e:
|
||
logging.error(f'保存到MongoDB时出错: {e}')
|
||
|
||
def run(self):
|
||
try:
|
||
self.setup_driver()
|
||
self.navigate()
|
||
self.ensure_login()
|
||
self.trigger_loading()
|
||
self.collect_network_bodies()
|
||
self.parse_ssr_data()
|
||
self.dedupe()
|
||
self.save_results()
|
||
logging.info('完成,play_vv数量: %d', len(self.play_vv_items))
|
||
finally:
|
||
if self.driver:
|
||
try:
|
||
self.driver.quit()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
parser = argparse.ArgumentParser(description='Selenium+CDP 抖音play_vv抓取器')
|
||
parser.add_argument('--url', default='https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation', help='收藏合集列表页面URL')
|
||
parser.add_argument('--auto', action='store_true', help='自动继续,跳过回车等待')
|
||
parser.add_argument('--duration', type=int, default=60, help='网络响应收集时长(秒)')
|
||
parser.add_argument('--driver', help='覆盖chromedriver路径')
|
||
args = parser.parse_args()
|
||
|
||
if args.driver:
|
||
os.environ['OVERRIDE_CHROMEDRIVER'] = args.driver
|
||
if args.auto:
|
||
os.environ['AUTO_CONTINUE'] = '1'
|
||
|
||
print('=== Selenium+CDP 抖音play_vv抓取器 ===')
|
||
print('将复用本地Chrome配置并抓取网络响应中的play_vv')
|
||
scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration)
|
||
scraper.run() |