From 9b5b43099720f06950b4033b020daa6dcbb18ebc Mon Sep 17 00:00:00 2001 From: Qyir <13521889462@163.com> Date: Mon, 3 Nov 2025 16:00:29 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=90=8E=E5=8F=B0=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/Timer_worker.py | 107 +++++++++++++----- .../handlers/Rankings/rank_data_scraper.py | 73 ++++++------ backend/routers/rank_api_routes.py | 89 ++++++++++++--- 3 files changed, 192 insertions(+), 77 deletions(-) diff --git a/backend/Timer_worker.py b/backend/Timer_worker.py index ec5c0c5..6e4cffa 100644 --- a/backend/Timer_worker.py +++ b/backend/Timer_worker.py @@ -66,6 +66,8 @@ def setup_logging(quiet_mode=False): class DouyinAutoScheduler: def __init__(self): self.is_running = False + # 创建logger实例 + self.logger = logging.getLogger(__name__) def _normalize_play_vv(self, play_vv): """标准化播放量数据类型,将字符串转换为数字""" @@ -82,23 +84,33 @@ class DouyinAutoScheduler: """按短剧名称去重,保留播放量最高的记录""" unique_data = {} for video in videos: - mix_name = video.get("mix_name", "") - if mix_name: - # 标准化播放量数据类型 - play_vv = self._normalize_play_vv(video.get("play_vv", 0)) + mix_name = video.get("mix_name", "").strip() + + # 过滤掉空的或无效的mix_name + if not mix_name or mix_name == "" or mix_name.lower() == "null": + self.logger.warning(f"跳过空的或无效的mix_name记录: {video.get('_id', 'unknown')}") + continue - if mix_name not in unique_data or play_vv > unique_data[mix_name].get("play_vv", 0): - if include_rank: - # 用于昨天数据的格式 - unique_data[mix_name] = { - "play_vv": play_vv, - "video_id": str(video.get("_id", "")), - "rank": 0 # 稍后计算排名 - } - else: - # 用于今天数据的格式,直接更新原视频对象 - video["play_vv"] = play_vv - unique_data[mix_name] = video + # 标准化播放量数据类型 + play_vv = self._normalize_play_vv(video.get("play_vv", 0)) + + # 确保播放量大于0,过滤无效数据 + if play_vv <= 0: + self.logger.warning(f"跳过播放量为0或无效的记录: mix_name={mix_name}, play_vv={video.get('play_vv', 0)}") + continue + + if mix_name not in unique_data or play_vv > unique_data[mix_name].get("play_vv", 0): + if include_rank: + # 用于昨天数据的格式 + unique_data[mix_name] = { + "play_vv": play_vv, + "video_id": str(video.get("_id", "")), + "rank": 0 # 稍后计算排名 + } + else: + # 用于今天数据的格式,直接更新原视频对象 + video["play_vv"] = play_vv + unique_data[mix_name] = video return unique_data @@ -181,10 +193,21 @@ class DouyinAutoScheduler: logging.info(f"📊 最新批次数据数量: {len(today_videos_raw)}") # 按短剧名称去重,每个短剧只保留播放量最高的一条 + # 🚫 过滤掉空的或无效的mix_name和播放量为0的记录 unique_videos = {} for video in today_videos_raw: - mix_name = video.get("mix_name", "") - if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)): + mix_name = video.get("mix_name", "").strip() + play_vv = video.get("play_vv", 0) + + # 过滤掉空的或无效的mix_name + if not mix_name or mix_name == "" or mix_name.lower() == "null": + continue + + # 过滤掉播放量为0或无效的记录 + if play_vv <= 0: + continue + + if mix_name not in unique_videos or play_vv > unique_videos[mix_name].get("play_vv", 0): unique_videos[mix_name] = video today_videos = list(unique_videos.values()) @@ -214,10 +237,21 @@ class DouyinAutoScheduler: }).sort("play_vv", -1)) # 按短剧名称去重,每个短剧只保留播放量最高的一条 + # 🚫 过滤掉空的或无效的mix_name和播放量为0的记录 unique_yesterday_videos = {} for video in yesterday_videos_raw: - mix_name = video.get("mix_name", "") - if mix_name and (mix_name not in unique_yesterday_videos or video.get("play_vv", 0) > unique_yesterday_videos[mix_name].get("play_vv", 0)): + mix_name = video.get("mix_name", "").strip() + play_vv = video.get("play_vv", 0) + + # 过滤掉空的或无效的mix_name + if not mix_name or mix_name == "" or mix_name.lower() == "null": + continue + + # 过滤掉播放量为0或无效的记录 + if play_vv <= 0: + continue + + if mix_name not in unique_yesterday_videos or play_vv > unique_yesterday_videos[mix_name].get("play_vv", 0): unique_yesterday_videos[mix_name] = video # 将昨天的数据转换为字典,以短剧名称为键 @@ -281,24 +315,44 @@ class DouyinAutoScheduler: rankings_management_collection = db['Rankings_management'] # 生成排序后的榜单数据 - for i, item in enumerate(videos_with_growth, 1): + rank = 1 # 使用独立的排名计数器 + for item in videos_with_growth: video = item["video"] video_id = str(video.get("_id", "")) current_play_vv = video.get("play_vv", 0) - mix_name = video.get("mix_name", "") + mix_name = video.get("mix_name", "").strip() + + # 🚫 跳过无效数据:确保mix_name不为空且播放量大于0 + # 注意:这些数据应该已经在去重阶段被过滤掉了,这里是双重保险 + if not mix_name or mix_name == "" or mix_name.lower() == "null": + self.logger.warning(f"跳过空的mix_name记录,video_id: {video_id}") + continue + + if current_play_vv <= 0: + self.logger.warning(f"跳过播放量无效的记录: mix_name={mix_name}, play_vv={current_play_vv}") + continue # 计算排名变化(基于昨天的排名) rank_change = 0 if not item["is_new"] and item["yesterday_data"]: yesterday_rank = item["yesterday_data"].get("rank", 0) - rank_change = yesterday_rank - i + rank_change = yesterday_rank - rank # 使用当前排名计数器 - # 🔍 从Rankings_management获取详细信息 - management_data = rankings_management_collection.find_one({"mix_name": mix_name}) + # 🔍 从Rankings_management获取详细信息(按日期和mix_name查询) + today_str = datetime.now().strftime('%Y-%m-%d') + management_data = rankings_management_collection.find_one({ + "mix_name": mix_name, + "$or": [ + {"created_at": {"$gte": datetime.strptime(today_str, '%Y-%m-%d'), + "$lt": datetime.strptime(today_str, '%Y-%m-%d') + timedelta(days=1)}}, + {"last_updated": {"$gte": datetime.strptime(today_str, '%Y-%m-%d'), + "$lt": datetime.strptime(today_str, '%Y-%m-%d') + timedelta(days=1)}} + ] + }) ranking_item = { # 🎯 核心榜单字段 - "rank": i, + "rank": rank, # 使用排名计数器 "title": mix_name, "mix_name": mix_name, # 确保包含mix_name字段用于同步 "play_vv": current_play_vv, @@ -344,6 +398,7 @@ class DouyinAutoScheduler: } comprehensive_ranking["data"].append(ranking_item) + rank += 1 # 递增排名计数器 # 为每次计算添加唯一的时间戳,确保数据唯一性 current_timestamp = datetime.now() diff --git a/backend/handlers/Rankings/rank_data_scraper.py b/backend/handlers/Rankings/rank_data_scraper.py index f0f723a..cf6b656 100644 --- a/backend/handlers/Rankings/rank_data_scraper.py +++ b/backend/handlers/Rankings/rank_data_scraper.py @@ -769,6 +769,16 @@ class DouyinPlayVVScraper: play_vv = statis.get('play_vv') if isinstance(play_vv, (int, str)) and str(play_vv).isdigit(): vv = int(play_vv) + + # 数据验证:确保播放量大于0且合集名称不为空 + if vv <= 0: + logging.warning(f"跳过无效的播放量数据: mix_name={mix_name}, play_vv={vv}") + return + + if not mix_name or mix_name.strip() == "": + logging.warning(f"跳过缺少合集名称的数据: play_vv={vv}") + return + # 构建合集链接 video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" @@ -972,6 +982,15 @@ class DouyinPlayVVScraper: vv = int(match.group(3)) episodes = int(match.group(4)) + # 数据验证:确保播放量大于0且合集名称不为空 + if vv <= 0: + logging.warning(f"正则提取跳过无效的播放量数据: mix_name={mix_name}, play_vv={vv}") + continue + + if not mix_name or mix_name.strip() == "": + logging.warning(f"正则提取跳过缺少合集名称的数据: play_vv={vv}") + continue + # 构建合集链接 video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" @@ -1006,27 +1025,17 @@ class DouyinPlayVVScraper: for match in re.findall(r'"play_vv"\s*:\s*(\d+)', text): try: vv = int(match) + # 数据验证:跳过无效的播放量数据 + if vv <= 0: + logging.warning(f"跳过无效的播放量数据: play_vv={vv}") + continue + # 检查是否已经存在相同的play_vv if not any(item['play_vv'] == vv for item in self.play_vv_items): - # 构建合集数据 - item_data = { - 'play_vv': vv, - 'formatted': self.format_count(vv), - 'url': source_url, - 'request_id': request_id, - 'mix_name': '', # 未知合集名称 - 'video_url': '', # 未知链接 - 'mix_id': '', # 未知mix_id - 'updated_to_episode': None, # 未知集数 - 'timestamp': datetime.now().isoformat() - } - - # 添加到列表(保持原有逻辑) - self.play_vv_items.append(item_data) - - # 实时保存到数据库(对于未知合集,可能不需要实时保存,但为了一致性还是保存) - if self.realtime_save_enabled: - self.save_single_item_realtime(item_data) + # 由于无法获取完整的合集信息,跳过这些不完整的数据 + # 避免产生mix_name为空的无效记录 + logging.warning(f"跳过不完整的数据记录: play_vv={vv}, 缺少合集名称") + continue except Exception: continue @@ -1129,25 +1138,17 @@ class DouyinPlayVVScraper: for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source): try: vv = int(m) + # 数据验证:跳过无效的播放量数据 + if vv <= 0: + logging.warning(f"跳过无效的播放量数据: play_vv={vv}") + continue + # 检查是否已经存在相同的play_vv if not any(item['play_vv'] == vv for item in self.play_vv_items): - # 构建合集数据 - item_data = { - 'play_vv': vv, - 'formatted': self.format_count(vv), - 'url': 'page_source_statis', - 'request_id': None, - 'mix_name': '', # 从statis中无法获取合集名称 - 'video_url': '', # 从statis中无法获取链接 - 'timestamp': datetime.now().isoformat() - } - - # 添加到列表(保持原有逻辑) - self.play_vv_items.append(item_data) - - # 实时保存到数据库 - if self.realtime_save_enabled: - self.save_single_item_realtime(item_data) + # 由于从statis中无法获取完整的合集信息,跳过这些不完整的数据 + # 避免产生mix_name为空的无效记录 + logging.warning(f"跳过不完整的数据记录: play_vv={vv}, 来源statis但缺少合集名称") + continue except Exception: pass except Exception: diff --git a/backend/routers/rank_api_routes.py b/backend/routers/rank_api_routes.py index 10dc818..2f07136 100644 --- a/backend/routers/rank_api_routes.py +++ b/backend/routers/rank_api_routes.py @@ -1200,8 +1200,18 @@ def update_content_classification(): } field_name = field_mapping[classification_type] - # 首先从Rankings_management获取短剧的mix_id - mgmt_doc = rankings_management_collection.find_one({"mix_name": mix_name}) + # 首先从Rankings_management获取短剧的mix_id,使用今天的日期 + today = datetime.now().date() + start_of_day = datetime.combine(today, datetime.min.time()) + end_of_day = datetime.combine(today, datetime.max.time()) + + mgmt_doc = rankings_management_collection.find_one({ + "mix_name": mix_name, + "$or": [ + {"created_at": {"$gte": start_of_day, "$lte": end_of_day}}, + {"last_updated": {"$gte": start_of_day, "$lte": end_of_day}} + ] + }) if not mgmt_doc: return jsonify({"success": False, "message": f"未找到短剧: {mix_name}"}) @@ -1286,8 +1296,14 @@ def update_content_classification(): logging.info(f"分类更新: {message}, Rankings_management({result_mgmt.modified_count}), Ranking_storage({result_storage.modified_count})") - # 获取更新后的分类状态 - updated_mgmt_doc = rankings_management_collection.find_one({"mix_name": mix_name}) + # 获取更新后的分类状态,使用今天的日期 + updated_mgmt_doc = rankings_management_collection.find_one({ + "mix_name": mix_name, + "$or": [ + {"created_at": {"$gte": start_of_day, "$lte": end_of_day}}, + {"last_updated": {"$gte": start_of_day, "$lte": end_of_day}} + ] + }) classification_status = { 'novel': mix_id in updated_mgmt_doc.get('Novel_IDs', []) if updated_mgmt_doc else False, 'anime': mix_id in updated_mgmt_doc.get('Anime_IDs', []) if updated_mgmt_doc else False, @@ -1521,41 +1537,84 @@ def sync_ranking_storage_fields(target_date=None, force_update=False, max_retrie # 遍历data数组中的每个项目 for data_item in data_array: try: - mix_name = data_item.get('mix_name') + mix_name = data_item.get('mix_name', '').strip() + + # 🚫 跳过无效数据:确保mix_name不为空 + if not mix_name or mix_name == "" or mix_name.lower() == "null": + logging.warning(f"跳过空的或无效的mix_name记录: {data_item.get('_id', 'unknown')}") + continue # 不添加到updated_data_array,直接跳过 # 🔧 增强逻辑:如果mix_name为空,尝试通过其他方式找到对应数据 source_data = None + + # 构建日期查询条件 - 查找当天的数据 + start_of_day = datetime.combine(target_date_obj, datetime.min.time()) + end_of_day = datetime.combine(target_date_obj, datetime.max.time()) + date_query = { + "$or": [ + {"created_at": {"$gte": start_of_day, "$lte": end_of_day}}, + {"last_updated": {"$gte": start_of_day, "$lte": end_of_day}} + ] + } + if mix_name: - # 优先使用mix_name查找 - 从Rankings_management获取数据 - source_data = rankings_management_collection.find_one({"mix_name": mix_name}) + # 优先使用mix_name查找 - 从Rankings_management获取数据,添加日期过滤 + query = {"mix_name": mix_name} + query.update(date_query) + source_data = rankings_management_collection.find_one(query) # 如果通过mix_name没找到数据,或者mix_name为空,尝试其他匹配方式 if not source_data: # 方法1:通过mix_id匹配(如果有的话) mix_id = data_item.get('mix_id') if mix_id: - source_data = rankings_management_collection.find_one({"mix_id": mix_id}) + query = {"mix_id": mix_id} + query.update(date_query) + source_data = rankings_management_collection.find_one(query) if source_data: logging.info(f"通过mix_id找到数据: {mix_id} -> {source_data.get('mix_name', 'N/A')}") # 方法2:如果还是没找到,尝试通过title匹配 if not source_data: title = data_item.get('title') - if title: - source_data = rankings_management_collection.find_one({"mix_name": title}) + if title and title.strip(): + query = {"mix_name": title.strip()} + query.update(date_query) + source_data = rankings_management_collection.find_one(query) if source_data: logging.info(f"通过title找到数据: {title} -> {source_data.get('mix_name', 'N/A')}") # 如果找到了源数据,更新mix_name(如果原来为空的话) if source_data and not mix_name: - mix_name = source_data.get('mix_name', '') - data_item['mix_name'] = mix_name - logging.info(f"修复空的mix_name: {data_item.get('title', 'N/A')} -> {mix_name}") + mix_name = source_data.get('mix_name', '').strip() + if mix_name: + data_item['mix_name'] = mix_name + logging.info(f"修复空的mix_name: {data_item.get('title', 'N/A')} -> {mix_name}") + else: + logging.warning(f"源数据中的mix_name也为空,跳过此记录") + continue # 跳过无效记录 - # 如果还是没有找到源数据,保持原数据不变 + # 如果还是没有找到源数据,检查是否有锁定字段需要保护 if not source_data: logging.warning(f"无法找到对应的源数据: mix_name={mix_name}, mix_id={data_item.get('mix_id')}, title={data_item.get('title')}") - updated_data_array.append(data_item) + + # 检查是否有锁定字段,如果有锁定字段,保持原数据不变 + field_lock_status = ranking_doc.get('field_lock_status', {}) + has_locked_fields = any([ + field_lock_status.get('Manufacturing_Field_locked', False), + field_lock_status.get('Copyright_field_locked', False), + field_lock_status.get('Novel_IDs_locked', False), + field_lock_status.get('Anime_IDs_locked', False), + field_lock_status.get('Drama_IDs_locked', False) + ]) + + if has_locked_fields: + logging.info(f"保持锁定字段不变: {mix_name} (无源数据但有锁定字段)") + updated_data_array.append(data_item) + else: + # 只有当mix_name有效且没有锁定字段时才保留记录 + if mix_name and mix_name.strip(): + updated_data_array.append(data_item) continue # 检查是否需要更新 - 包含所有Rankings_management字段