1401 lines
66 KiB
Python
1401 lines
66 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Selenium + Chrome DevTools Protocol 抓取抖音收藏合集真实播放量(play_vv)
|
||
|
||
核心能力:
|
||
- 启用CDP网络事件,获取响应体并解析play_vv
|
||
- 复用本地Chrome用户数据,绕过登录障碍
|
||
- 自动滚动与刷新触发更多API请求
|
||
- 同时解析页面中的SSR数据(window._SSR_HYDRATED_DATA/RENDER_DATA)
|
||
|
||
使用方法:
|
||
1) 默认复用 `config/chrome_profile` 下的已登录Chrome配置。
|
||
2) 若仍需登录,请在弹出的Chrome中完成登录后回到终端按回车。
|
||
3) 程序会滚动和刷新,自动收集网络数据并提取play_vv。
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
import subprocess
|
||
import time
|
||
import logging
|
||
import os
|
||
import shutil
|
||
from datetime import datetime
|
||
import requests
|
||
import base64
|
||
import uuid
|
||
import sys
|
||
import psutil
|
||
import random
|
||
import threading
|
||
import argparse
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.common.action_chains import ActionChains
|
||
# 保留导入但默认不使用webdriver_manager,避免网络下载卡顿
|
||
from webdriver_manager.chrome import ChromeDriverManager # noqa: F401
|
||
# 添加项目根目录到 Python 路径
|
||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
|
||
# 确保能找到backend目录下的模块
|
||
backend_dir = os.path.join(os.path.dirname(__file__), '..', '..')
|
||
sys.path.insert(0, backend_dir)
|
||
from database import db
|
||
from tos_client import oss_client
|
||
|
||
|
||
# 配置日志
|
||
# 确保logs目录存在
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
logs_dir = os.path.join(script_dir, 'logs')
|
||
os.makedirs(logs_dir, exist_ok=True)
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='[%(levelname)s] %(message)s',
|
||
handlers=[
|
||
logging.FileHandler(os.path.join(logs_dir, 'douyin_scraper.log'), encoding='utf-8'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
|
||
|
||
class DouyinPlayVVScraper:
|
||
def __init__(self, start_url: str = None, auto_continue: bool = False, duration_s: int = 60):
|
||
self.start_url = start_url or "https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation"
|
||
self.auto_continue = auto_continue
|
||
self.duration_s = duration_s
|
||
self.driver = None
|
||
self.play_vv_items = [] # list of dicts: {play_vv, formatted, url, request_id, mix_name, watched_item}
|
||
self.captured_responses = []
|
||
self.db = None
|
||
self.collection = None
|
||
self.image_cache = {} # 图片ID到TOS链接的缓存映射 {image_id: tos_url}
|
||
self.all_collected_comments = [] # 存储所有收集到的评论数据
|
||
self._cleanup_old_profiles()
|
||
self._setup_mongodb()
|
||
self._load_image_cache()
|
||
|
||
def _setup_mongodb(self):
|
||
"""设置MongoDB连接"""
|
||
try:
|
||
# 使用 database.py 中的连接
|
||
self.db = db
|
||
|
||
# 根据运行模式选择集合
|
||
is_timer_mode = os.environ.get('TIMER_MODE') == '1'
|
||
mongo_collection = 'Ranking_storage_list' if is_timer_mode else 'Rankings_list'
|
||
self.collection = self.db[mongo_collection]
|
||
|
||
logging.info(f'MongoDB连接成功,使用数据库: {self.db.name},集合: {mongo_collection}')
|
||
logging.info(f'当前运行模式: {"定时器模式" if is_timer_mode else "普通模式"}')
|
||
|
||
except Exception as e:
|
||
logging.error(f'MongoDB连接失败: {e}')
|
||
self.db = None
|
||
self.collection = None
|
||
|
||
def _load_image_cache(self):
|
||
"""从数据库加载已存在的图片ID到TOS链接的映射"""
|
||
if self.collection is None:
|
||
return
|
||
|
||
try:
|
||
# 查询所有有封面图片的记录
|
||
cursor = self.collection.find(
|
||
{
|
||
'cover_image_url_original': {'$exists': True, '$ne': ''},
|
||
'cover_image_url': {'$exists': True, '$ne': ''}
|
||
},
|
||
{'cover_image_url_original': 1, 'cover_image_url': 1}
|
||
)
|
||
|
||
cache_count = 0
|
||
for doc in cursor:
|
||
original_url = doc.get('cover_image_url_original', '')
|
||
tos_url = doc.get('cover_image_url', '')
|
||
|
||
if original_url and tos_url and original_url != tos_url:
|
||
# 提取图片ID
|
||
image_id = self.extract_douyin_image_id(original_url)
|
||
if image_id:
|
||
self.image_cache[image_id] = tos_url
|
||
cache_count += 1
|
||
|
||
logging.info(f'从数据库加载图片缓存: {cache_count} 个图片映射')
|
||
|
||
except Exception as e:
|
||
logging.error(f'加载图片缓存失败: {e}')
|
||
|
||
def _cleanup_old_profiles(self):
|
||
"""清理超过一天的旧临时Chrome配置文件"""
|
||
try:
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
profile_base_dir = os.path.join(script_dir, 'config', 'chrome_profile')
|
||
if not os.path.exists(profile_base_dir):
|
||
return
|
||
|
||
current_time = time.time()
|
||
one_day_ago = current_time - 24 * 60 * 60 # 24小时前
|
||
|
||
for item in os.listdir(profile_base_dir):
|
||
if item.startswith('run_'):
|
||
item_path = os.path.join(profile_base_dir, item)
|
||
if os.path.isdir(item_path):
|
||
try:
|
||
# 提取时间戳
|
||
timestamp = int(item.split('_')[1])
|
||
if timestamp < one_day_ago:
|
||
shutil.rmtree(item_path, ignore_errors=True)
|
||
logging.info(f'清理旧配置文件: {item}')
|
||
except (ValueError, IndexError):
|
||
# 如果无法解析时间戳,跳过
|
||
continue
|
||
except Exception as e:
|
||
logging.warning(f'清理旧配置文件时出错: {e}')
|
||
|
||
def _cleanup_chrome_processes(self):
|
||
"""清理可能占用配置文件的Chrome进程"""
|
||
try:
|
||
|
||
# 获取当前配置文件路径
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent')
|
||
|
||
# 查找使用该配置文件的Chrome进程
|
||
killed_processes = []
|
||
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||
try:
|
||
if proc.info['name'] and 'chrome' in proc.info['name'].lower():
|
||
cmdline = proc.info['cmdline']
|
||
if cmdline and any(profile_dir in arg for arg in cmdline):
|
||
proc.terminate()
|
||
killed_processes.append(proc.info['pid'])
|
||
logging.info(f'终止占用配置文件的Chrome进程: PID {proc.info["pid"]}')
|
||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||
continue
|
||
|
||
# 等待进程终止
|
||
if killed_processes:
|
||
time.sleep(2)
|
||
|
||
return len(killed_processes) > 0
|
||
|
||
except ImportError:
|
||
# 如果没有psutil,使用系统命令
|
||
try:
|
||
result = subprocess.run(['taskkill', '/f', '/im', 'chrome.exe'],
|
||
capture_output=True, text=True, timeout=10)
|
||
if result.returncode == 0:
|
||
logging.info('使用taskkill清理Chrome进程')
|
||
time.sleep(2)
|
||
return True
|
||
except Exception as e:
|
||
logging.warning(f'清理Chrome进程失败: {e}')
|
||
return False
|
||
except Exception as e:
|
||
logging.warning(f'清理Chrome进程时出错: {e}')
|
||
return False
|
||
|
||
def setup_driver(self):
|
||
logging.info('初始化Chrome WebDriver (启用CDP网络日志)')
|
||
|
||
# 清理可能占用配置文件的Chrome进程
|
||
self._cleanup_chrome_processes()
|
||
|
||
chrome_options = Options()
|
||
chrome_options.add_argument('--no-sandbox')
|
||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
|
||
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
chrome_options.add_argument('--disable-extensions')
|
||
chrome_options.add_argument('--remote-allow-origins=*')
|
||
chrome_options.add_argument('--remote-debugging-port=0')
|
||
chrome_options.add_argument('--start-maximized')
|
||
chrome_options.add_argument('--lang=zh-CN')
|
||
# 使用固定的Chrome配置文件目录以保持登录状态
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent')
|
||
os.makedirs(profile_dir, exist_ok=True)
|
||
chrome_options.add_argument(f'--user-data-dir={profile_dir}')
|
||
logging.info(f'使用持久化Chrome配置文件: {profile_dir}')
|
||
# 明确设置Chrome二进制路径(32位Chrome常见安装位置)
|
||
possible_chrome_bins = [
|
||
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
|
||
r"C:\Program Files\Google\Chrome\Application\chrome.exe"
|
||
]
|
||
for bin_path in possible_chrome_bins:
|
||
if os.path.exists(bin_path):
|
||
chrome_options.binary_location = bin_path
|
||
logging.info(f'使用Chrome二进制路径: {bin_path}')
|
||
break
|
||
# 性能日志(Network事件)
|
||
chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})
|
||
|
||
# 仅使用本地或PATH中的chromedriver,避免网络下载依赖
|
||
driver_ready = False
|
||
candidates = []
|
||
# 可通过环境变量强制覆盖驱动路径
|
||
env_override = os.environ.get('OVERRIDE_CHROMEDRIVER')
|
||
if env_override:
|
||
candidates.append(env_override)
|
||
logging.info(f'检测到环境变量 OVERRIDE_CHROMEDRIVER,优先使用: {env_override}')
|
||
# 脚本所在目录的drivers路径(优先)
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
script_driver_path = os.path.join(script_dir, 'drivers', 'chromedriver.exe')
|
||
candidates.append(script_driver_path)
|
||
logging.info(f'优先尝试脚本目录路径: {script_driver_path}')
|
||
|
||
# 项目根目录的drivers路径
|
||
user_driver_path = os.path.join(os.getcwd(), 'drivers', 'chromedriver.exe')
|
||
candidates.append(user_driver_path)
|
||
logging.info(f'尝试项目根目录路径: {user_driver_path}')
|
||
|
||
# 项目根目录
|
||
candidates.append(os.path.join(os.getcwd(), 'chromedriver.exe'))
|
||
# 其他可能目录
|
||
candidates.append(os.path.join(os.getcwd(), 'drivers', 'chromedriver'))
|
||
# PATH 中的chromedriver
|
||
which_path = shutil.which('chromedriver')
|
||
if which_path:
|
||
candidates.append(which_path)
|
||
|
||
if not driver_ready:
|
||
for p in candidates:
|
||
try:
|
||
if p and os.path.exists(p):
|
||
logging.info(f'尝试使用chromedriver: {p}')
|
||
service = Service(p)
|
||
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
driver_ready = True
|
||
logging.info(f'使用chromedriver启动成功: {p}')
|
||
try:
|
||
caps = self.driver.capabilities
|
||
browser_ver = caps.get('browserVersion') or caps.get('version')
|
||
cdver = caps.get('chrome', {}).get('chromedriverVersion')
|
||
logging.info(f'Chrome版本: {browser_ver}, ChromeDriver版本: {cdver}')
|
||
except Exception:
|
||
pass
|
||
break
|
||
else:
|
||
logging.info(f'候选路径不存在: {p}')
|
||
except Exception as e:
|
||
logging.warning(f'尝试使用 {p} 启动失败: {e}')
|
||
|
||
if not driver_ready:
|
||
# 最终回退:使用webdriver-manager(可能需要网络)
|
||
try:
|
||
service = Service(ChromeDriverManager().install())
|
||
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
driver_ready = True
|
||
logging.info('使用webdriver-manager成功启动ChromeDriver')
|
||
except Exception as e:
|
||
raise RuntimeError('未能启动ChromeDriver。请手动下载匹配版本的chromedriver到项目根目录或PATH,或检查网络以允许webdriver-manager下载。错误: ' + str(e))
|
||
|
||
# 反检测
|
||
try:
|
||
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||
except Exception:
|
||
pass
|
||
|
||
# 启用CDP Network
|
||
try:
|
||
self.driver.execute_cdp_cmd('Network.enable', {})
|
||
logging.info('已启用CDP Network')
|
||
except Exception as e:
|
||
logging.warning(f'启用CDP Network失败: {e}')
|
||
|
||
def navigate(self):
|
||
logging.info(f'导航到: {self.start_url}')
|
||
self.driver.get(self.start_url)
|
||
time.sleep(8) # 增加页面加载等待时间
|
||
|
||
def ensure_login(self):
|
||
"""确保用户已登录并导航到收藏合集页面"""
|
||
logging.info("检测登录状态和页面位置...")
|
||
|
||
# 首先检查是否已经登录并在正确页面
|
||
if self._check_login_and_page():
|
||
logging.info("检测到已登录且在收藏合集页面,跳过手动确认")
|
||
return
|
||
|
||
# 如果未登录或不在正确页面,进行手动登录流程
|
||
logging.info("请在弹出的浏览器中手动完成登录。")
|
||
|
||
if self.auto_continue:
|
||
logging.info('自动继续模式,跳过手动等待...')
|
||
time.sleep(5)
|
||
return
|
||
|
||
logging.info("进入手动登录确认循环...")
|
||
while True:
|
||
# 要求用户输入特定文本确认
|
||
logging.info("等待用户输入确认...")
|
||
user_input = input("请在浏览器中完成登录,并导航到【我的】→【收藏】→【合集】页面。操作完成后,请在此处输入 'ok' 并按回车: ")
|
||
|
||
if user_input.strip().lower() != 'ok':
|
||
logging.warning("请输入 'ok' 确认您已完成登录并导航到【我的】→【收藏】→【合集】页面。")
|
||
continue
|
||
|
||
logging.info("用户已确认,检查当前页面...")
|
||
|
||
try:
|
||
current_url = self.driver.current_url
|
||
logging.info(f"当前页面URL: {current_url}")
|
||
if ("douyin.com/user/self" in current_url and
|
||
("favorite_collection" in current_url or "compilation" in current_url)):
|
||
logging.info(f"已确认您位于收藏合集列表页面: {current_url}")
|
||
logging.info("脚本将继续执行...")
|
||
break
|
||
else:
|
||
# 用户确认了,但页面不正确,继续循环等待
|
||
logging.warning(f"检测到当前页面 ({current_url}) 并非收藏合集列表页面。请确保已导航至【我的】→【收藏】→【合集】页面。")
|
||
|
||
except Exception as e:
|
||
if "browser has been closed" in str(e) or "no such window" in str(e) or "target window already closed" in str(e):
|
||
logging.error("浏览器窗口已关闭,脚本无法继续。")
|
||
raise RuntimeError("浏览器窗口已关闭")
|
||
logging.warning(f"检测URL时出错: {e}。请重试。")
|
||
time.sleep(1)
|
||
|
||
def _check_login_and_page(self, timeout: int = 600) -> bool:
|
||
"""检查是否已登录并在正确页面"""
|
||
try:
|
||
current_url = self.driver.current_url
|
||
logging.info(f"当前页面URL: {current_url}")
|
||
|
||
# 检查是否在收藏合集页面
|
||
if ("douyin.com/user/self" in current_url and
|
||
("favorite_collection" in current_url or "compilation" in current_url)):
|
||
# 进一步检查登录状态
|
||
return self._detect_login_status(timeout)
|
||
else:
|
||
# 如果不在正确页面,尝试导航到收藏合集页面
|
||
if self._detect_login_status(timeout):
|
||
logging.info("已登录但不在收藏合集页面,自动导航...")
|
||
self.driver.get(self.start_url)
|
||
time.sleep(3)
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
logging.warning(f"检查登录状态时出错: {e}")
|
||
return False
|
||
|
||
def _detect_login_status(self, timeout: int = 600) -> bool:
|
||
"""自动检测是否已登录"""
|
||
try:
|
||
start = time.time()
|
||
while time.time() - start < timeout:
|
||
time.sleep(2)
|
||
# 检查登录状态的多个选择器
|
||
selectors = [
|
||
'[data-e2e="user-avatar"]',
|
||
'.user-avatar',
|
||
'[class*="avatar"]',
|
||
'[class*="Avatar"]'
|
||
]
|
||
|
||
for selector in selectors:
|
||
try:
|
||
elements = self.driver.find_elements("css selector", selector)
|
||
if elements:
|
||
logging.info("检测到用户头像,确认已登录")
|
||
return True
|
||
except Exception:
|
||
continue
|
||
|
||
# 检查是否有登录按钮(表示未登录)
|
||
login_selectors = [
|
||
'[data-e2e="login-button"]',
|
||
'button[class*="login"]',
|
||
'a[href*="login"]'
|
||
]
|
||
|
||
for selector in login_selectors:
|
||
try:
|
||
elements = self.driver.find_elements("css selector", selector)
|
||
if elements:
|
||
logging.info("检测到登录按钮,用户未登录")
|
||
return False
|
||
except Exception:
|
||
continue
|
||
|
||
logging.info("登录状态检测超时,假设未登录")
|
||
return False
|
||
except Exception as e:
|
||
logging.warning(f"登录状态检测出错: {e}")
|
||
return False
|
||
|
||
def trigger_loading(self):
|
||
logging.info('触发数据加载:滚动 + 刷新')
|
||
# 滚动触发懒加载
|
||
for i in range(8):
|
||
self.driver.execute_script(f'window.scrollTo(0, {i * 900});')
|
||
time.sleep(1.2)
|
||
# 刷新触发新请求
|
||
self.driver.refresh()
|
||
time.sleep(4)
|
||
for i in range(6):
|
||
self.driver.execute_script(f'window.scrollTo(0, {i * 1200});')
|
||
time.sleep(1.3)
|
||
|
||
def format_count(self, n: int) -> str:
|
||
if n >= 100_000_000:
|
||
return f"{n/100_000_000:.1f}亿"
|
||
if n >= 10_000:
|
||
return f"{n/10_000:.1f}万"
|
||
return str(n)
|
||
|
||
|
||
|
||
def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None):
|
||
"""解析文本中的play_vv、mix_name和watched_item信息"""
|
||
try:
|
||
# 尝试解析JSON数据
|
||
if text.strip().startswith('{') or text.strip().startswith('['):
|
||
try:
|
||
data = json.loads(text)
|
||
self._extract_from_json_data(data, source_url, request_id)
|
||
return
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# 如果不是JSON,使用正则表达式查找
|
||
self._extract_from_text_regex(text, source_url, request_id)
|
||
|
||
except Exception as e:
|
||
logging.warning(f'解析文本数据时出错: {e}')
|
||
|
||
def _extract_from_json_data(self, data, source_url: str, request_id: str = None):
|
||
"""从JSON数据中递归提取合集信息"""
|
||
def extract_mix_info(obj, path=""):
|
||
if isinstance(obj, dict):
|
||
# 检查是否包含合集信息
|
||
if 'mix_id' in obj and 'statis' in obj:
|
||
mix_id = obj.get('mix_id', '')
|
||
mix_name = obj.get('mix_name', '')
|
||
statis = obj.get('statis', {})
|
||
|
||
# 调试:输出包含mix_id的完整对象结构(仅输出前3个)
|
||
if len(self.play_vv_items) < 3:
|
||
logging.info(f"=== 调试:合集对象结构 ===")
|
||
logging.info(f"完整对象键: {list(obj.keys())}")
|
||
# 查找可能的视频相关字段和新增字段
|
||
for key, value in obj.items():
|
||
if 'aweme' in key.lower() or 'video' in key.lower() or 'item' in key.lower() or 'ids' in key.lower():
|
||
logging.info(f"可能的视频字段 {key}: {type(value)} - {str(value)[:200]}")
|
||
# 检查新增字段相关的键
|
||
elif any(keyword in key.lower() for keyword in ['author', 'creator', 'user', 'desc', 'description', 'total', 'count', 'episode']):
|
||
logging.info(f"可能的新字段 {key}: {type(value)} - {str(value)[:200]}")
|
||
|
||
# 特别检查ids字段
|
||
if 'ids' in obj:
|
||
ids_value = obj['ids']
|
||
logging.info(f"ids字段详细信息: {type(ids_value)} - {ids_value}")
|
||
if isinstance(ids_value, list) and len(ids_value) > 0:
|
||
logging.info(f"ids列表长度: {len(ids_value)}")
|
||
logging.info(f"第一个ID: {ids_value[0]}")
|
||
if len(ids_value) > 1:
|
||
logging.info(f"第二个ID: {ids_value[1]}")
|
||
|
||
if isinstance(statis, dict) and 'play_vv' in statis:
|
||
play_vv = statis.get('play_vv')
|
||
if isinstance(play_vv, (int, str)) and str(play_vv).isdigit():
|
||
vv = int(play_vv)
|
||
# 构建合集链接
|
||
video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else ""
|
||
|
||
# 提取合集封面图片URL - 直接存储完整的图片链接
|
||
cover_image_url = ""
|
||
cover_image_backup_urls = [] # 备用链接列表
|
||
|
||
# 查找封面图片字段,优先获取完整的URL链接
|
||
if 'cover' in obj:
|
||
cover = obj['cover']
|
||
if isinstance(cover, dict) and 'url_list' in cover and cover['url_list']:
|
||
# 主链接
|
||
cover_image_url = cover['url_list'][0]
|
||
# 备用链接
|
||
cover_image_backup_urls = cover['url_list'][1:] if len(cover['url_list']) > 1 else []
|
||
elif isinstance(cover, str):
|
||
cover_image_url = cover
|
||
elif 'cover_url' in obj:
|
||
cover_url = obj['cover_url']
|
||
if isinstance(cover_url, dict) and 'url_list' in cover_url and cover_url['url_list']:
|
||
cover_image_url = cover_url['url_list'][0]
|
||
cover_image_backup_urls = cover_url['url_list'][1:] if len(cover_url['url_list']) > 1 else []
|
||
elif isinstance(cover_url, str):
|
||
cover_image_url = cover_url
|
||
elif 'image' in obj:
|
||
image = obj['image']
|
||
if isinstance(image, dict) and 'url_list' in image and image['url_list']:
|
||
cover_image_url = image['url_list'][0]
|
||
cover_image_backup_urls = image['url_list'][1:] if len(image['url_list']) > 1 else []
|
||
elif isinstance(image, str):
|
||
cover_image_url = image
|
||
elif 'pic' in obj:
|
||
pic = obj['pic']
|
||
if isinstance(pic, dict) and 'url_list' in pic and pic['url_list']:
|
||
cover_image_url = pic['url_list'][0]
|
||
cover_image_backup_urls = pic['url_list'][1:] if len(pic['url_list']) > 1 else []
|
||
elif isinstance(pic, str):
|
||
cover_image_url = pic
|
||
|
||
# 提取新增的三个字段
|
||
series_author = ""
|
||
desc = ""
|
||
updated_to_episode = 0
|
||
|
||
# 提取合集作者/影视工作室
|
||
if 'author' in obj:
|
||
author = obj['author']
|
||
if isinstance(author, dict):
|
||
# 尝试多个可能的作者字段
|
||
series_author = (author.get('nickname') or
|
||
author.get('unique_id') or
|
||
author.get('short_id') or
|
||
author.get('name') or '')
|
||
elif isinstance(author, str):
|
||
series_author = author
|
||
elif 'creator' in obj:
|
||
creator = obj['creator']
|
||
if isinstance(creator, dict):
|
||
series_author = (creator.get('nickname') or
|
||
creator.get('unique_id') or
|
||
creator.get('name') or '')
|
||
elif isinstance(creator, str):
|
||
series_author = creator
|
||
elif 'user' in obj:
|
||
user = obj['user']
|
||
if isinstance(user, dict):
|
||
series_author = (user.get('nickname') or
|
||
user.get('unique_id') or
|
||
user.get('name') or '')
|
||
elif isinstance(user, str):
|
||
series_author = user
|
||
|
||
# 提取合集描述 - 扩展更多可能的字段
|
||
description_fields = ['desc', 'share_info'] # 保持字段列表
|
||
|
||
# 先检查desc字段
|
||
if 'desc' in obj and obj['desc']:
|
||
desc_value = str(obj['desc']).strip()
|
||
if desc_value:
|
||
desc = desc_value
|
||
logging.info(f"从desc提取到描述")
|
||
|
||
# 如果desc中没有找到有效描述,检查share_info
|
||
if not desc and 'share_info' in obj and isinstance(obj['share_info'], dict):
|
||
share_desc = obj['share_info'].get('share_desc', '').strip()
|
||
if share_desc:
|
||
desc = share_desc
|
||
logging.info(f"从share_info.share_desc提取到描述")
|
||
|
||
# 如果share_info中没有找到有效描述,继续检查desc字段
|
||
if not desc:
|
||
for field in description_fields:
|
||
if field in obj and obj[field]:
|
||
desc_value = str(obj[field]).strip()
|
||
if desc_value:
|
||
desc = desc_value
|
||
logging.info(f"从{field}提取到描述")
|
||
break
|
||
|
||
# 如果还没有找到描述,尝试从嵌套对象中查找desc字段
|
||
if not desc:
|
||
def search_nested_desc(data, depth=0):
|
||
if depth > 3: # 限制递归深度
|
||
return None
|
||
|
||
if isinstance(data, dict):
|
||
# 检查当前层级的desc字段
|
||
if 'desc' in data and data['desc']:
|
||
desc_value = str(data['desc']).strip()
|
||
if 5 <= len(desc_value) <= 1000:
|
||
return desc_value
|
||
|
||
# 递归检查嵌套对象
|
||
for value in data.values():
|
||
if isinstance(value, dict):
|
||
nested_result = search_nested_desc(value, depth + 1)
|
||
if nested_result:
|
||
return nested_result
|
||
return None
|
||
|
||
desc = search_nested_desc(obj)
|
||
|
||
|
||
# 提取合集总集数 - 从statis字段中获取
|
||
updated_to_episode = 0 # 初始化默认值
|
||
if 'statis' in obj and isinstance(obj['statis'], dict):
|
||
statis = obj['statis']
|
||
if 'updated_to_episode' in statis:
|
||
try:
|
||
episodes = int(statis['updated_to_episode'])
|
||
if episodes > 0:
|
||
updated_to_episode = episodes
|
||
logging.info(f"从statis.updated_to_episode提取到集数: {episodes}")
|
||
except ValueError:
|
||
logging.warning("updated_to_episode字段值无法转换为整数")
|
||
else:
|
||
logging.info("未找到statis字段或statis不是字典类型")
|
||
try:
|
||
episodes = int(obj['updated_to_episode'])
|
||
if episodes > 0:
|
||
updated_to_episode = episodes
|
||
logging.info(f"从updated_to_episode提取到集数: {episodes}")
|
||
except ValueError:
|
||
pass # 忽略无法转换为整数的情况
|
||
|
||
self.play_vv_items.append({
|
||
'play_vv': vv,
|
||
'formatted': self.format_count(vv),
|
||
'url': source_url,
|
||
'request_id': request_id,
|
||
'mix_name': mix_name,
|
||
'video_url': video_url, # 合集链接
|
||
'mix_id': mix_id, # 合集ID
|
||
'cover_image_url': cover_image_url, # 合集封面图片主链接(完整URL)
|
||
'cover_backup_urls': cover_image_backup_urls, # 封面图片备用链接列表
|
||
'series_author': series_author, # 合集作者/影视工作室
|
||
'desc': desc, # 合集描述
|
||
'updated_to_episode': updated_to_episode, # 合集总集数
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
logging.info(f'提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量')
|
||
if series_author:
|
||
logging.info(f' 作者: {series_author}')
|
||
if desc:
|
||
logging.info(f' 描述: {desc[:100]}{"..." if len(desc) > 100 else ""}')
|
||
if updated_to_episode > 0:
|
||
logging.info(f' 总集数: {updated_to_episode}')
|
||
|
||
# 递归搜索子对象
|
||
for key, value in obj.items():
|
||
if isinstance(value, (dict, list)):
|
||
extract_mix_info(value, f"{path}.{key}" if path else key)
|
||
|
||
elif isinstance(obj, list):
|
||
for i, item in enumerate(obj):
|
||
if isinstance(item, (dict, list)):
|
||
extract_mix_info(item, f"{path}[{i}]" if path else f"[{i}]")
|
||
|
||
extract_mix_info(data)
|
||
|
||
def _extract_from_text_regex(self, text: str, source_url: str, request_id: str = None):
|
||
"""使用正则表达式从文本中提取信息"""
|
||
# 查找包含完整合集信息的JSON片段,包括statis中的updated_to_episode
|
||
mix_pattern = r'\{[^{}]*"mix_id"\s*:\s*"([^"]*)"[^{}]*"mix_name"\s*:\s*"([^"]*)"[^{}]*"statis"\s*:\s*\{[^{}]*"play_vv"\s*:\s*(\d+)[^{}]*"updated_to_episode"\s*:\s*(\d+)[^{}]*\}[^{}]*\}'
|
||
|
||
for match in re.finditer(mix_pattern, text):
|
||
try:
|
||
mix_id = match.group(1)
|
||
mix_name = match.group(2)
|
||
vv = int(match.group(3))
|
||
episodes = int(match.group(4))
|
||
|
||
# 构建合集链接
|
||
video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else ""
|
||
|
||
if episodes > 0:
|
||
logging.info(f"从statis.updated_to_episode提取到集数: {episodes}")
|
||
|
||
self.play_vv_items.append({
|
||
'play_vv': vv,
|
||
'formatted': self.format_count(vv),
|
||
'url': source_url,
|
||
'request_id': request_id,
|
||
'mix_name': mix_name,
|
||
'video_url': video_url, # 合集链接
|
||
'mix_id': mix_id, # 合集ID
|
||
'updated_to_episode': episodes if episodes > 0 else None, # 从statis.updated_to_episode提取的集数
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量')
|
||
except Exception:
|
||
continue
|
||
|
||
# 兜底:查找单独的play_vv值
|
||
for match in re.findall(r'"play_vv"\s*:\s*(\d+)', text):
|
||
try:
|
||
vv = int(match)
|
||
# 检查是否已经存在相同的play_vv
|
||
if not any(item['play_vv'] == vv for item in self.play_vv_items):
|
||
self.play_vv_items.append({
|
||
'play_vv': vv,
|
||
'formatted': self.format_count(vv),
|
||
'url': source_url,
|
||
'request_id': request_id,
|
||
'mix_name': '', # 未知合集名称
|
||
'video_url': '', # 未知链接
|
||
'mix_id': '', # 未知mix_id
|
||
'updated_to_episode': None, # 未知集数
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
except Exception:
|
||
continue
|
||
|
||
def collect_network_bodies(self, duration_s: int = None):
|
||
if duration_s is None:
|
||
duration_s = self.duration_s
|
||
logging.info(f'开始收集网络响应体,持续 {duration_s}s')
|
||
start = time.time()
|
||
known_request_ids = set()
|
||
|
||
# 目标关键词(收藏/合集/视频)
|
||
url_keywords = ['aweme', 'mix', 'collection', 'favorite', 'note', 'api']
|
||
|
||
last_progress = 0
|
||
while time.time() - start < duration_s:
|
||
try:
|
||
logs = self.driver.get_log('performance')
|
||
except Exception as e:
|
||
logging.warning(f'获取性能日志失败: {e}')
|
||
time.sleep(1)
|
||
continue
|
||
|
||
for entry in logs:
|
||
try:
|
||
message = json.loads(entry['message'])['message']
|
||
except Exception:
|
||
continue
|
||
|
||
method = message.get('method')
|
||
params = message.get('params', {})
|
||
|
||
# 记录请求URL
|
||
if method == 'Network.requestWillBeSent':
|
||
req_id = params.get('requestId')
|
||
url = params.get('request', {}).get('url', '')
|
||
if any(k in url for k in url_keywords):
|
||
self.captured_responses.append({'requestId': req_id, 'url': url, 'type': 'request'})
|
||
|
||
# 响应到达,尝试获取响应体
|
||
if method == 'Network.responseReceived':
|
||
req_id = params.get('requestId')
|
||
url = params.get('response', {}).get('url', '')
|
||
type_ = params.get('type') # XHR, Fetch, Document
|
||
if req_id and req_id not in known_request_ids:
|
||
known_request_ids.add(req_id)
|
||
# 仅处理XHR/Fetch
|
||
if type_ in ('XHR', 'Fetch') and any(k in url for k in url_keywords):
|
||
try:
|
||
body_obj = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': req_id})
|
||
body_text = body_obj.get('body', '')
|
||
# 可能是base64编码
|
||
if body_obj.get('base64Encoded'):
|
||
try:
|
||
import base64
|
||
body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore')
|
||
except Exception:
|
||
pass
|
||
|
||
# 解析play_vv
|
||
self.parse_play_vv_from_text(body_text, url, req_id)
|
||
except Exception:
|
||
# 某些响应不可获取或过大
|
||
pass
|
||
elapsed = int(time.time() - start)
|
||
if elapsed - last_progress >= 5:
|
||
last_progress = elapsed
|
||
logging.info(f'进度: {elapsed}/{duration_s}, 目标数量: {len(self.play_vv_items)}')
|
||
time.sleep(0.8)
|
||
|
||
logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)} 个目标')
|
||
|
||
|
||
def parse_ssr_data(self):
|
||
logging.info('尝试解析页面SSR数据')
|
||
# 尝试直接从window对象获取
|
||
keys = ['_SSR_HYDRATED_DATA', 'RENDER_DATA']
|
||
for key in keys:
|
||
try:
|
||
data = self.driver.execute_script(f'return window.{key}')
|
||
if data:
|
||
text = json.dumps(data, ensure_ascii=False)
|
||
self.parse_play_vv_from_text(text, f'page_{key}', None)
|
||
logging.info(f'从 {key} 中解析完成')
|
||
except Exception:
|
||
continue
|
||
|
||
# 兜底:从page_source中正则查找
|
||
try:
|
||
page_source = self.driver.page_source
|
||
self.parse_play_vv_from_text(page_source, 'page_source', None)
|
||
# 同时尝试识别statis结构中的play_vv
|
||
for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source):
|
||
try:
|
||
vv = int(m)
|
||
# 检查是否已经存在相同的play_vv
|
||
if not any(item['play_vv'] == vv for item in self.play_vv_items):
|
||
self.play_vv_items.append({
|
||
'play_vv': vv,
|
||
'formatted': self.format_count(vv),
|
||
'url': 'page_source_statis',
|
||
'request_id': None,
|
||
'mix_name': '', # 从statis中无法获取合集名称
|
||
'video_url': '', # 从statis中无法获取链接
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
def dedupe(self):
|
||
# 去重按play_vv数值
|
||
unique = []
|
||
seen = set()
|
||
for item in self.play_vv_items:
|
||
vv = item['play_vv']
|
||
if vv not in seen:
|
||
unique.append(item)
|
||
seen.add(vv)
|
||
self.play_vv_items = unique
|
||
|
||
def save_results(self):
|
||
# 保存到MongoDB
|
||
self.save_to_mongodb()
|
||
|
||
logging.info('结果已保存到MongoDB')
|
||
|
||
def extract_douyin_image_id(self, cover_url):
|
||
"""
|
||
从抖音图片URL中提取唯一的图片ID
|
||
|
||
Args:
|
||
cover_url (str): 抖音图片URL
|
||
|
||
Returns:
|
||
str: 图片ID,如果提取失败返回空字符串
|
||
"""
|
||
if not cover_url:
|
||
return ''
|
||
|
||
try:
|
||
# 抖音图片URL格式支持两种:
|
||
# 1. https://p{数字}-sign.douyinpic.com/obj/tos-cn-i-dy/{图片ID}?{参数}
|
||
# 2. https://p{数字}-sign.douyinpic.com/obj/douyin-user-image-file/{图片ID}?{参数}
|
||
# 使用正则表达式提取图片ID
|
||
patterns = [
|
||
r'/obj/tos-cn-i-dy/([a-f0-9]+)',
|
||
r'/obj/douyin-user-image-file/([a-f0-9]+)'
|
||
]
|
||
|
||
for pattern in patterns:
|
||
match = re.search(pattern, cover_url)
|
||
if match:
|
||
image_id = match.group(1)
|
||
logging.debug(f'提取图片ID成功: {image_id} from {cover_url}')
|
||
return image_id
|
||
|
||
logging.warning(f'无法从URL中提取图片ID: {cover_url}')
|
||
return ''
|
||
|
||
except Exception as e:
|
||
logging.error(f'提取图片ID异常: {cover_url}, 错误: {e}')
|
||
return ''
|
||
|
||
def upload_cover_image(self, cover_url, mix_name):
|
||
"""
|
||
上传封面图片到TOS并返回永久链接(带去重功能)
|
||
|
||
Args:
|
||
cover_url: 临时封面图片链接
|
||
mix_name: 合集名称,用于生成文件名
|
||
|
||
Returns:
|
||
str: 永久链接URL,如果上传失败则返回原链接
|
||
"""
|
||
if not cover_url:
|
||
return cover_url
|
||
|
||
try:
|
||
# 提取图片ID
|
||
image_id = self.extract_douyin_image_id(cover_url)
|
||
|
||
# 如果能提取到图片ID,检查缓存
|
||
if image_id:
|
||
if image_id in self.image_cache:
|
||
cached_url = self.image_cache[image_id]
|
||
logging.info(f'使用缓存图片: {image_id} -> {cached_url} (合集: {mix_name})')
|
||
return cached_url
|
||
|
||
# 生成随机文件名,保持原有的扩展名
|
||
file_extension = '.jpg' # 抖音封面图片通常是jpg格式
|
||
|
||
# 改进的扩展名检测逻辑
|
||
url_without_params = cover_url.split('?')[0]
|
||
url_path = url_without_params.split('/')[-1] # 获取URL路径的最后一部分
|
||
|
||
# 只有当最后一部分包含点且点后面的内容是常见图片扩展名时才使用
|
||
if '.' in url_path:
|
||
potential_ext = url_path.split('.')[-1].lower()
|
||
# 检查是否为常见的图片扩展名
|
||
if potential_ext in ['jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp']:
|
||
file_extension = '.' + potential_ext
|
||
|
||
# 生成唯一文件名
|
||
random_filename = f"{uuid.uuid4().hex}{file_extension}"
|
||
object_key = f"media/rank/{random_filename}"
|
||
|
||
logging.info(f'开始上传封面图片: {mix_name}')
|
||
logging.info(f'封面图片URL: {cover_url}')
|
||
|
||
# 从URL上传到TOS并获取新的URL
|
||
oss_url = oss_client.upload_from_url(
|
||
url=cover_url,
|
||
object_key=object_key,
|
||
return_url=True
|
||
)
|
||
|
||
logging.info(f'封面图片上传成功: {mix_name} -> {oss_url}')
|
||
|
||
# 如果有图片ID,将结果缓存
|
||
if image_id:
|
||
self.image_cache[image_id] = oss_url
|
||
logging.debug(f'图片缓存已更新: {image_id} -> {oss_url}')
|
||
|
||
return oss_url
|
||
|
||
except Exception as e:
|
||
logging.error(f'封面图片上传失败: {mix_name} - {str(e)}')
|
||
return cover_url # 上传失败时返回原链接
|
||
|
||
def save_to_mongodb(self):
|
||
"""将数据保存到MongoDB"""
|
||
if self.collection is None:
|
||
logging.warning('MongoDB未连接,跳过数据库保存')
|
||
return
|
||
|
||
if not self.play_vv_items:
|
||
logging.info('没有数据需要保存到MongoDB')
|
||
return
|
||
|
||
try:
|
||
batch_time = datetime.now()
|
||
documents = []
|
||
|
||
for item in self.play_vv_items:
|
||
# 获取原始封面图片URL
|
||
original_cover_url = item.get('cover_image_url', '')
|
||
mix_name = item.get('mix_name', '')
|
||
|
||
# 处理封面图片
|
||
permanent_cover_url = ''
|
||
if original_cover_url:
|
||
# 上传封面图片到TOS获取永久链接
|
||
permanent_cover_url = self.upload_cover_image(original_cover_url, mix_name)
|
||
|
||
# 如果上传失败且有原始链接,记录警告但继续保存
|
||
if permanent_cover_url == original_cover_url:
|
||
logging.warning(f'封面图片上传失败,使用原始链接: {mix_name}')
|
||
else:
|
||
# 没有封面图片,使用空字符串
|
||
permanent_cover_url = ''
|
||
|
||
# 获取合集中的所有视频ID
|
||
mix_id = item.get('mix_id', '')
|
||
episode_video_ids = []
|
||
if mix_id:
|
||
logging.info(f'获取合集 {mix_name} 的所有视频ID')
|
||
current_episode_count = item.get('updated_to_episode', 0)
|
||
episode_video_ids = self.get_collection_videos(
|
||
mix_id=mix_id,
|
||
mix_name=mix_name,
|
||
current_episode_count=current_episode_count
|
||
)
|
||
logging.info(f'合集 {mix_name} 共获取到 {len(episode_video_ids)} 个视频ID')
|
||
|
||
# 获取每个视频的详细互动数据
|
||
logging.info(f'开始获取合集 {mix_name} 的视频详细互动数据')
|
||
video_details_list = self.get_collection_video_details(
|
||
episode_video_ids=episode_video_ids,
|
||
mix_name=mix_name,
|
||
max_comments_per_video=10 # 每个视频最多获取10条评论
|
||
)
|
||
|
||
# 构建每集的详细信息,使用获取到的真实数据
|
||
episode_details = []
|
||
total_episodes = item.get('updated_to_episode', 0)
|
||
|
||
for i in range(total_episodes):
|
||
episode_number = i + 1
|
||
video_id = episode_video_ids[i] if i < len(episode_video_ids) else ''
|
||
|
||
# 查找对应的视频详细数据
|
||
video_detail = None
|
||
if i < len(video_details_list):
|
||
video_detail = video_details_list[i]
|
||
|
||
if video_detail and video_detail.get('success', False):
|
||
# 使用获取到的真实数据
|
||
likes = video_detail.get('likes', 0)
|
||
shares = video_detail.get('shares', 0)
|
||
favorites = video_detail.get('favorites', 0)
|
||
|
||
episode_info = {
|
||
'episode_number': episode_number,
|
||
'video_id': video_id,
|
||
'likes': likes,
|
||
'shares': shares,
|
||
'favorites': favorites,
|
||
'likes_formatted': self.format_interaction_count(likes),
|
||
'shares_formatted': self.format_interaction_count(shares),
|
||
'favorites_formatted': self.format_interaction_count(favorites),
|
||
'comments': video_detail.get('comments', [])
|
||
}
|
||
else:
|
||
# 使用默认值
|
||
episode_info = {
|
||
'episode_number': episode_number,
|
||
'video_id': video_id,
|
||
'likes': 0,
|
||
'shares': 0,
|
||
'favorites': 0,
|
||
'likes_formatted': '0',
|
||
'shares_formatted': '0',
|
||
'favorites_formatted': '0',
|
||
'comments': []
|
||
}
|
||
|
||
episode_details.append(episode_info)
|
||
|
||
# 统计获取到的数据
|
||
total_likes = sum(ep.get('likes', 0) for ep in episode_details)
|
||
total_comments = sum(len(ep.get('comments', [])) for ep in episode_details)
|
||
logging.info(f'合集 {mix_name} 详细数据统计: 总点赞数={total_likes:,}, 总评论数={total_comments}')
|
||
else:
|
||
# 如果没有获取到视频ID,使用默认的episode_details
|
||
episode_details = [
|
||
{
|
||
'episode_number': i + 1,
|
||
'video_id': '',
|
||
'likes': 0,
|
||
'shares': 0,
|
||
'favorites': 0,
|
||
'likes_formatted': '0',
|
||
'shares_formatted': '0',
|
||
'favorites_formatted': '0',
|
||
'comments': []
|
||
} for i in range(item.get('updated_to_episode', 0))
|
||
]
|
||
|
||
# 保留用户要求的7个字段 + cover_image_url作为合集封面图片完整链接 + 新增字段
|
||
doc = {
|
||
'batch_time': batch_time,
|
||
'mix_name': mix_name,
|
||
'video_url': item.get('video_url', ''),
|
||
'playcount': item.get('formatted', ''),
|
||
'play_vv': item.get('play_vv', 0),
|
||
'request_id': item.get('request_id', ''),
|
||
'rank': 0, # 临时设置,后面会重新计算
|
||
'cover_image_url_original': original_cover_url, # 保存原始临时链接用于调试
|
||
'cover_image_url': permanent_cover_url, # 合集封面图片永久链接
|
||
'cover_backup_urls': item.get('cover_backup_urls', []), # 封面图片备用链接列表
|
||
# 新增的字段
|
||
'series_author': item.get('series_author', ''), # 合集作者/影视工作室
|
||
'desc': item.get('desc', ''), # 合集描述
|
||
'updated_to_episode': item.get('updated_to_episode', 0), # 合集总集数
|
||
'episode_video_ids': episode_video_ids, # 每一集的视频ID列表
|
||
'episode_details': episode_details # 每集的详细信息
|
||
}
|
||
documents.append(doc)
|
||
|
||
# 按播放量降序排序并添加排名
|
||
documents.sort(key=lambda x: x['play_vv'], reverse=True)
|
||
for i, doc in enumerate(documents, 1):
|
||
doc['rank'] = i
|
||
|
||
# 批量插入
|
||
result = self.collection.insert_many(documents)
|
||
logging.info(f'成功保存 {len(result.inserted_ids)} 条记录到MongoDB')
|
||
|
||
# 输出统计信息
|
||
total_play_vv = sum(doc['play_vv'] for doc in documents)
|
||
max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0
|
||
|
||
logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}')
|
||
logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url_original, cover_image_url, series_author, desc, updated_to_episode')
|
||
|
||
# 统计封面图片处理情况
|
||
cover_count = sum(1 for doc in documents if doc.get('cover_image_url'))
|
||
original_count = sum(1 for item in self.play_vv_items if item.get('cover_image_url'))
|
||
success_count = sum(1 for doc in documents if doc.get('cover_image_url') and doc.get('cover_image_url') != doc.get('cover_image_url_original', ''))
|
||
|
||
logging.info(f'封面图片统计: {cover_count}/{len(documents)} 个合集有封面链接')
|
||
logging.info(f'封面上传统计: {success_count}/{original_count} 个封面成功上传到TOS')
|
||
logging.info(f'图片缓存统计: 当前缓存 {len(self.image_cache)} 个图片映射')
|
||
|
||
except Exception as e:
|
||
logging.error(f'保存到MongoDB时出错: {e}')
|
||
|
||
def get_video_info(self, video_id: str) -> dict:
|
||
"""获取视频详细信息
|
||
Args:
|
||
video_id: 视频ID
|
||
Returns:
|
||
dict: 包含视频详细信息的字典
|
||
"""
|
||
video_url = f'https://www.douyin.com/video/{video_id}'
|
||
logging.info(f'获取视频信息: {video_url}')
|
||
|
||
# 清除之前的网络日志
|
||
self.driver.execute_cdp_cmd('Network.clearBrowserCache', {})
|
||
self.driver.execute_cdp_cmd('Network.clearBrowserCookies', {})
|
||
self.driver.get(video_url)
|
||
time.sleep(3)
|
||
|
||
# 等待页面加载完成
|
||
try:
|
||
|
||
|
||
WebDriverWait(self.driver, 10).until(
|
||
EC.presence_of_element_located((By.TAG_NAME, "video"))
|
||
)
|
||
except Exception as e:
|
||
logging.warning(f'等待视频元素超时: {e}')
|
||
|
||
# 获取网络请求日志
|
||
logs = self.driver.get_log('performance')
|
||
video_info = {}
|
||
|
||
for entry in logs:
|
||
try:
|
||
log = json.loads(entry['message'])['message']
|
||
if (
|
||
'Network.responseReceived' in log['method']
|
||
and 'response' in log['params']
|
||
and log['params']['response']
|
||
and 'url' in log['params']['response']
|
||
and '/web/api/v2/aweme/iteminfo' in log['params']['response']['url']
|
||
):
|
||
request_id = log['params']['requestId']
|
||
response = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id})
|
||
if response and 'body' in response:
|
||
data = json.loads(response['body'])
|
||
if 'item_list' in data and len(data['item_list']) > 0:
|
||
item = data['item_list'][0]
|
||
video_info = {
|
||
'video_id': item.get('aweme_id'),
|
||
'create_time': item.get('create_time'),
|
||
'desc': item.get('desc'),
|
||
'duration': item.get('duration'),
|
||
'mix_info': {
|
||
'mix_id': item.get('mix_info', {}).get('mix_id'),
|
||
'mix_name': item.get('mix_info', {}).get('mix_name'),
|
||
'total': item.get('mix_info', {}).get('total')
|
||
}
|
||
}
|
||
break
|
||
except Exception as e:
|
||
logging.warning(f'解析日志条目时出错: {e}')
|
||
|
||
return video_info
|
||
|
||
def get_collection_videos(self, mix_id: str, mix_name: str = '', current_episode_count: int = 0) -> list:
|
||
"""获取合集中的所有视频ID列表,支持增量更新
|
||
Args:
|
||
mix_id: 合集ID
|
||
mix_name: 合集名称,用于日志
|
||
current_episode_count: 当前已知的集数
|
||
Returns:
|
||
list: 按集数排序的视频ID列表
|
||
"""
|
||
# 定时器模式下跳过此函数
|
||
if os.environ.get('TIMER_MODE') == '1':
|
||
logging.info(f'定时器模式:跳过 get_collection_videos 函数')
|
||
return []
|
||
|
||
try:
|
||
# 检查缓存文件
|
||
cache_dir = os.path.join(os.path.dirname(__file__), 'episode_video_ids')
|
||
# 确保缓存目录存在
|
||
os.makedirs(cache_dir, exist_ok=True)
|
||
cache_file = os.path.join(cache_dir, f'video_ids_{mix_id}.json')
|
||
cached_videos = []
|
||
|
||
try:
|
||
if os.path.exists(cache_file):
|
||
with open(cache_file, 'r', encoding='utf-8') as f:
|
||
cache_data = json.load(f)
|
||
cached_videos = cache_data.get('episodes', [])
|
||
last_update = cache_data.get('last_update')
|
||
|
||
# 如果缓存的集数等于当前集数,直接返回缓存的结果
|
||
if len(cached_videos) == current_episode_count:
|
||
logging.info(f"使用缓存的视频列表: {mix_name} (ID: {mix_id})")
|
||
return [video['video_id'] for video in cached_videos]
|
||
except Exception as e:
|
||
logging.warning(f"读取缓存文件失败: {e}")
|
||
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||
'Accept': 'application/json, text/plain, */*',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9',
|
||
'Referer': 'https://www.douyin.com/',
|
||
}
|
||
|
||
params = {
|
||
'device_platform': 'webapp',
|
||
'aid': '6383',
|
||
'channel': 'channel_pc_web',
|
||
'pc_client_type': '1',
|
||
'version_code': '170400',
|
||
'version_name': '17.4.0',
|
||
'cookie_enabled': 'true',
|
||
'platform': 'PC',
|
||
'downlink': '10',
|
||
'mix_id': mix_id,
|
||
'cursor': '0',
|
||
'count': '30',
|
||
'screen_width': '1920',
|
||
'screen_height': '1080',
|
||
'browser_language': 'zh-CN',
|
||
'browser_platform': 'Win32',
|
||
'browser_name': 'Chrome',
|
||
'browser_version': '120.0.0.0',
|
||
'browser_online': 'true',
|
||
'engine_name': 'Blink',
|
||
'engine_version': '120.0.0.0',
|
||
'os_name': 'Windows',
|
||
'os_version': '10',
|
||
'cpu_core_num': '16',
|
||
'device_memory': '8',
|
||
'effective_type': '4g',
|
||
'round_trip_time': '50',
|
||
}
|
||
|
||
all_videos = []
|
||
|
||
while True:
|
||
response = requests.get(
|
||
'https://www.douyin.com/aweme/v1/web/mix/aweme/',
|
||
params=params,
|
||
cookies=self.get_cookies_dict(),
|
||
headers=headers
|
||
)
|
||
|
||
if response.status_code != 200:
|
||
logging.error(f"请求失败: {response.status_code}")
|
||
logging.error(f"响应内容: {response.text}")
|
||
break
|
||
|
||
try:
|
||
data = response.json()
|
||
aweme_list = data.get('aweme_list', [])
|
||
if not aweme_list:
|
||
break
|
||
|
||
for aweme in aweme_list:
|
||
video_id = aweme.get('aweme_id')
|
||
if video_id:
|
||
all_videos.append({
|
||
'video_id': video_id,
|
||
'episode_num': int(aweme.get('episode_num', 0))
|
||
})
|
||
|
||
has_more = data.get('has_more', False)
|
||
if not has_more:
|
||
break
|
||
|
||
params['cursor'] = str(len(all_videos))
|
||
time.sleep(1)
|
||
|
||
except json.JSONDecodeError as e:
|
||
logging.error(f"JSON解析错误: {e}")
|
||
logging.error(f"响应内容: {response.text}")
|
||
break
|
||
|
||
if not all_videos:
|
||
if cached_videos:
|
||
logging.warning(f"获取视频列表失败,使用缓存数据: {mix_name} (ID: {mix_id})")
|
||
return [video['video_id'] for video in cached_videos]
|
||
return []
|
||
|
||
logging.info(f"获取到 {len(all_videos)} 个视频ID")
|
||
|
||
# 按集数排序
|
||
all_videos.sort(key=lambda x: x['episode_num'])
|
||
|
||
# 整理视频ID和集数信息
|
||
episode_info = []
|
||
for video in all_videos:
|
||
episode_info.append({
|
||
'video_id': video['video_id'],
|
||
'episode_num': video['episode_num']
|
||
})
|
||
|
||
# 检查是否有新增视频
|
||
if len(episode_info) > len(cached_videos):
|
||
logging.info(f"发现新增视频: {mix_name} (ID: {mix_id}), 新增 {len(episode_info) - len(cached_videos)} 集")
|
||
|
||
# 保存到缓存文件
|
||
with open(cache_file, 'w', encoding='utf-8') as f:
|
||
json.dump({
|
||
'episodes': episode_info,
|
||
'total_count': len(episode_info),
|
||
'last_update': datetime.now().isoformat(),
|
||
'mix_name': mix_name
|
||
}, f, ensure_ascii=False, indent=2)
|
||
|
||
# 返回视频ID列表
|
||
return [video['video_id'] for video in all_videos]
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取合集视频时出错: {e}")
|
||
# 如果出错且有缓存,返回缓存的结果
|
||
if cached_videos:
|
||
logging.warning(f"使用缓存的视频列表: {mix_name} (ID: {mix_id})")
|
||
return [video['video_id'] for video in cached_videos]
|
||
return []
|
||
|
||
def get_cookies_dict(self):
|
||
"""获取当前页面的cookies"""
|
||
if not hasattr(self, 'cookies') or not self.cookies:
|
||
self.cookies = {cookie['name']: cookie['value'] for cookie in self.driver.get_cookies()}
|
||
return self.cookies
|
||
|
||
def run(self):
|
||
try:
|
||
self.setup_driver()
|
||
self.navigate()
|
||
self.ensure_login()
|
||
self.trigger_loading()
|
||
self.collect_network_bodies()
|
||
self.parse_ssr_data()
|
||
self.dedupe()
|
||
self.save_results()
|
||
logging.info('完成,play_vv数量: %d', len(self.play_vv_items))
|
||
finally:
|
||
if self.driver:
|
||
try:
|
||
self.driver.quit()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == '__main__':
|
||
parser = argparse.ArgumentParser(description='Selenium+CDP 抖音play_vv抓取器')
|
||
parser.add_argument('--url', default='https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation', help='收藏合集列表页面URL')
|
||
parser.add_argument('--auto', action='store_true', help='自动继续,跳过回车等待')
|
||
parser.add_argument('--duration', type=int, default=60, help='网络响应收集时长(秒)')
|
||
parser.add_argument('--driver', help='覆盖chromedriver路径')
|
||
args = parser.parse_args()
|
||
|
||
if args.driver:
|
||
os.environ['OVERRIDE_CHROMEDRIVER'] = args.driver
|
||
if args.auto:
|
||
os.environ['AUTO_CONTINUE'] = '1'
|
||
|
||
print('=== Selenium+CDP 抖音play_vv抓取器 ===')
|
||
scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration)
|
||
scraper.run() |