From 4057620cf40ea8d8ac9feab616b184b29c1f49ee Mon Sep 17 00:00:00 2001 From: Qyir <13521889462@163.com> Date: Thu, 6 Nov 2025 18:13:31 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=94=81=E5=AE=9A=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handlers/Rankings/rank_data_scraper.py | 90 +++++--- backend/routers/rank_api_routes.py | 206 +++++++++++++----- frontend/vite.config.js | 10 + 3 files changed, 230 insertions(+), 76 deletions(-) diff --git a/backend/handlers/Rankings/rank_data_scraper.py b/backend/handlers/Rankings/rank_data_scraper.py index cf6b656..d626ff6 100644 --- a/backend/handlers/Rankings/rank_data_scraper.py +++ b/backend/handlers/Rankings/rank_data_scraper.py @@ -1503,6 +1503,7 @@ class DouyinPlayVVScraper: doc = { 'batch_time': batch_time, 'mix_name': mix_name, + 'mix_id': item.get('mix_id', ''), # 合集ID 'video_url': item.get('video_url', ''), 'playcount': item.get('formatted', ''), 'play_vv': item.get('play_vv', 0), @@ -1538,7 +1539,7 @@ class DouyinPlayVVScraper: max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0 logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}') - logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url_original, cover_image_url, cover_upload_success, series_author, Manufacturing_Field, Copyright_field, desc, updated_to_episode') + logging.info(f'保存的字段: batch_time, mix_name, mix_id, video_url, playcount, play_vv, request_id, rank, cover_image_url_original, cover_image_url, cover_upload_success, series_author, Manufacturing_Field, Copyright_field, desc, updated_to_episode') # 统计封面图片处理情况 cover_count = sum(1 for doc in documents if doc.get('cover_image_url')) @@ -1677,19 +1678,20 @@ class DouyinPlayVVScraper: 'data_status': target_doc.get('data_status', ''), 'realtime_saved': target_doc.get('realtime_saved', True), 'created_at': target_doc.get('created_at', datetime.now()), - 'last_updated': target_doc['last_updated'], - # 新增:内容分类字段(存储短剧ID列表) - 'Novel_IDs': target_doc.get('Novel_IDs', []), - 'Anime_IDs': target_doc.get('Anime_IDs', []), - 'Drama_IDs': target_doc.get('Drama_IDs', []) + 'last_updated': target_doc['last_updated'] + # 注意:分类字段 Novel_IDs, Anime_IDs, Drama_IDs 不在此处设置 + # 因为爬虫数据不包含这些用户手动设置的分类信息 + # 这些字段只在保护逻辑中处理,避免覆盖现有数据 } - # 锁定字段保护逻辑:Manufacturing_Field 和 Copyright_field - # 规则:如果现有记录中这些字段有值,则跳过更新(保持原值) - # 如果现有记录中这些字段为空,且新数据有值,则更新 + # 锁定字段保护逻辑:检查field_lock_status来决定是否更新锁定字段 + # 规则:如果字段被用户锁定(field_lock_status中包含该字段),则跳过更新 + # 如果字段未被锁定,且现有记录中这些字段有值,则跳过更新(保持原值) + # 如果字段未被锁定,且现有记录中这些字段为空,且新数据有值,则更新 # 如果是新记录,则使用新数据的值 if existing_doc: # 记录已存在,检查锁定字段保护 + existing_field_lock_status = existing_doc.get('field_lock_status', {}) existing_manufacturing = existing_doc.get('Manufacturing_Field', '') existing_copyright = existing_doc.get('Copyright_field', '') existing_novel_ids = existing_doc.get('Novel_IDs', []) @@ -1698,12 +1700,17 @@ class DouyinPlayVVScraper: new_manufacturing = target_doc.get('Manufacturing_Field', '') new_copyright = target_doc.get('Copyright_field', '') - new_novel_ids = target_doc.get('Novel_IDs', []) - new_anime_ids = target_doc.get('Anime_IDs', []) - new_drama_ids = target_doc.get('Drama_IDs', []) + # 注意:不从target_doc获取分类字段,因为爬虫数据不包含这些字段 + # 分类字段只能由用户手动设置,爬虫不应该更新它们 + new_novel_ids = [] # 爬虫数据不包含此字段 + new_anime_ids = [] # 爬虫数据不包含此字段 + new_drama_ids = [] # 爬虫数据不包含此字段 # Manufacturing_Field 保护逻辑 - if existing_manufacturing: + if existing_field_lock_status.get('Manufacturing_Field_locked', False): + # 字段被用户锁定,跳过更新 + logging.info(f'[锁定字段] 跳过Manufacturing_Field更新: {mix_name} -> 字段已被用户锁定') + elif existing_manufacturing: # 现有字段有值,跳过更新(不添加到set_fields中) logging.info(f'[锁定字段] 跳过Manufacturing_Field更新: {mix_name} -> 保持现有值 "{existing_manufacturing}"') elif new_manufacturing: @@ -1713,7 +1720,10 @@ class DouyinPlayVVScraper: # 如果现有为空且新数据也为空,则不设置该字段(保持为空) # Copyright_field 保护逻辑 - if existing_copyright: + if existing_field_lock_status.get('Copyright_field_locked', False): + # 字段被用户锁定,跳过更新 + logging.info(f'[锁定字段] 跳过Copyright_field更新: {mix_name} -> 字段已被用户锁定') + elif existing_copyright: # 现有字段有值,跳过更新(不添加到set_fields中) logging.info(f'[锁定字段] 跳过Copyright_field更新: {mix_name} -> 保持现有值 "{existing_copyright}"') elif new_copyright: @@ -1723,7 +1733,10 @@ class DouyinPlayVVScraper: # 如果现有为空且新数据也为空,则不设置该字段(保持为空) # Novel_IDs 保护逻辑 - if existing_novel_ids and len(existing_novel_ids) > 0: + if existing_field_lock_status.get('Novel_IDs_locked', False): + # 字段被用户锁定,跳过更新 + logging.info(f'[锁定字段] 跳过Novel_IDs更新: {mix_name} -> 字段已被用户锁定') + elif existing_novel_ids and len(existing_novel_ids) > 0: # 现有字段有值,跳过更新(不添加到set_fields中) logging.info(f'[锁定字段] 跳过Novel_IDs更新: {mix_name} -> 保持现有值 {existing_novel_ids}') elif new_novel_ids and len(new_novel_ids) > 0: @@ -1733,7 +1746,10 @@ class DouyinPlayVVScraper: # 如果现有为空且新数据也为空,则不设置该字段(保持为空) # Anime_IDs 保护逻辑 - if existing_anime_ids and len(existing_anime_ids) > 0: + if existing_field_lock_status.get('Anime_IDs_locked', False): + # 字段被用户锁定,跳过更新 + logging.info(f'[锁定字段] 跳过Anime_IDs更新: {mix_name} -> 字段已被用户锁定') + elif existing_anime_ids and len(existing_anime_ids) > 0: # 现有字段有值,跳过更新(不添加到set_fields中) logging.info(f'[锁定字段] 跳过Anime_IDs更新: {mix_name} -> 保持现有值 {existing_anime_ids}') elif new_anime_ids and len(new_anime_ids) > 0: @@ -1743,7 +1759,10 @@ class DouyinPlayVVScraper: # 如果现有为空且新数据也为空,则不设置该字段(保持为空) # Drama_IDs 保护逻辑 - if existing_drama_ids and len(existing_drama_ids) > 0: + if existing_field_lock_status.get('Drama_IDs_locked', False): + # 字段被用户锁定,跳过更新 + logging.info(f'[锁定字段] 跳过Drama_IDs更新: {mix_name} -> 字段已被用户锁定') + elif existing_drama_ids and len(existing_drama_ids) > 0: # 现有字段有值,跳过更新(不添加到set_fields中) logging.info(f'[锁定字段] 跳过Drama_IDs更新: {mix_name} -> 保持现有值 {existing_drama_ids}') elif new_drama_ids and len(new_drama_ids) > 0: @@ -1753,13 +1772,13 @@ class DouyinPlayVVScraper: # 如果现有为空且新数据也为空,则不设置该字段(保持为空) else: - # 新记录,使用新数据的值(可能为空) + # 新记录,只设置非分类字段 set_fields['Manufacturing_Field'] = target_doc.get('Manufacturing_Field', '') set_fields['Copyright_field'] = target_doc.get('Copyright_field', '') - set_fields['Novel_IDs'] = target_doc.get('Novel_IDs', []) - set_fields['Anime_IDs'] = target_doc.get('Anime_IDs', []) - set_fields['Drama_IDs'] = target_doc.get('Drama_IDs', []) - logging.info(f'[锁定字段] 新记录,设置初始锁定字段: {mix_name}') + # 注意:不设置分类字段 Novel_IDs, Anime_IDs, Drama_IDs + # 因为爬虫数据不包含这些用户手动设置的分类信息 + # 新记录的分类字段将保持为空,等待用户手动设置 + logging.info(f'[锁定字段] 新记录,设置初始非分类字段: {mix_name}') # 使用upsert操作:如果存在则更新,不存在则插入 upsert_result = target_collection.update_one( @@ -2316,13 +2335,15 @@ class DouyinPlayVVScraper: return [] def _simulate_comment_scrolling(self, video_id: str, max_scroll_attempts: int = 10, scroll_delay: float = 2.0, - document_id=None, episode_number: int = 0, mix_name: str = '', mix_id: str = '') -> list: + document_id=None, episode_number: int = 0, mix_name: str = '', mix_id: str = '', + max_comments: int = 100) -> list: """ 模拟用户异步滑动机制,向上滑动加载更多评论 Args: video_id: 视频ID max_scroll_attempts: 最大滑动尝试次数,默认10次 scroll_delay: 每次滑动后的延迟时间(秒),默认2秒 + max_comments: 每集最大评论数量限制,默认100条 Returns: list: 收集到的所有评论数据 """ @@ -2370,7 +2391,7 @@ class DouyinPlayVVScraper: # 同时提交监控任务 - 监控任务会检测滑动任务状态(5小时超时) monitor_future = executor.submit(self._async_monitor_task_with_state, video_id, collected_comment_ids, shared_state, 18000, - document_id, episode_number, mix_name, mix_id) + document_id, episode_number, mix_name, mix_id, max_comments) # 等待两个任务完成 scroll_result = scroll_future.result() @@ -2418,6 +2439,12 @@ class DouyinPlayVVScraper: attempt += 1 logging.info(f'第 {attempt} 次向上滑动') + # 检查监控任务是否通知停止 + with shared_state['lock']: + if shared_state['scroll_completed']: + logging.info('收到监控任务停止信号,滑动任务结束') + break + # 记录滑动前的位置 current_position = self.driver.execute_script("return window.pageYOffset;") @@ -2679,7 +2706,8 @@ class DouyinPlayVVScraper: return all_comments def _async_monitor_task_with_state(self, video_id: str, collected_comment_ids: set, shared_state: dict, timeout: float, - document_id=None, episode_number: int = 0, mix_name: str = '', mix_id: str = '') -> list: + document_id=None, episode_number: int = 0, mix_name: str = '', mix_id: str = '', + max_comments: int = 100) -> list: """带状态的异步监控任务 - 监控评论并检测滑动任务状态""" # 确保 episode_number 是整数类型 try: @@ -2755,6 +2783,13 @@ class DouyinPlayVVScraper: if no_new_comments_count % 30 == 0: logging.info(f'监控中...当前总计 {current_comment_count} 条评论,等待滑动任务完成') + # 检查是否达到评论数量限制 + if current_comment_count >= max_comments: + logging.info(f'已收集到 {current_comment_count} 条评论,达到限制数量 {max_comments},通知滑动任务停止') + with shared_state['lock']: + shared_state['scroll_completed'] = True + break + # 短暂等待后继续监控 time.sleep(1) @@ -2772,7 +2807,8 @@ class DouyinPlayVVScraper: time.sleep(2) logging.info(f'监控任务结束,共收集到 {len(all_comments)} 条评论') - return all_comments + # 确保只返回前max_comments条评论 + return all_comments[:max_comments] def _scroll_to_comment_section(self): """滚动到评论区域""" @@ -3210,7 +3246,7 @@ class DouyinPlayVVScraper: # 启动滑动机制加载更多评论 logging.info(f'开始为视频 {video_id} 启动滑动机制加载评论') scrolled_comments = self._simulate_comment_scrolling(video_id, max_scroll_attempts=15, scroll_delay=2.0, - document_id=document_id, episode_number=episode_number, mix_name=mix_name, mix_id=mix_id) + document_id=document_id, episode_number=episode_number, mix_name=mix_name, mix_id=mix_id, max_comments=100) # 如果滑动机制获取到评论,直接使用 if scrolled_comments: diff --git a/backend/routers/rank_api_routes.py b/backend/routers/rank_api_routes.py index 2f07136..513e882 100644 --- a/backend/routers/rank_api_routes.py +++ b/backend/routers/rank_api_routes.py @@ -65,6 +65,81 @@ def format_time(time_obj): else: return str(time_obj) +def parse_date_string(date_str): + """通用日期解析函数""" + try: + if isinstance(date_str, str): + return datetime.strptime(date_str, '%Y-%m-%d').date() + return date_str + except (ValueError, TypeError): + logging.warning(f"无法解析日期字符串: {date_str}") + return None + +def find_management_data(query, target_date=None): + """ + 通用的管理数据查询函数,优先使用mix_id进行查询 + + Args: + query: 查询条件字典,可以包含mix_id, mix_name等字段 + target_date: 目标日期,用于日期过滤 + + Returns: + 查询到的文档或None + """ + try: + # 如果查询条件中有mix_id,优先使用mix_id查询 + if 'mix_id' in query and query['mix_id']: + mix_id_query = {"mix_id": query['mix_id']} + + # 添加日期过滤(如果提供了target_date) + if target_date: + if isinstance(target_date, str): + target_date = parse_date_string(target_date) + if target_date: + start_of_day = datetime.combine(target_date, datetime.min.time()) + end_of_day = datetime.combine(target_date, datetime.max.time()) + mix_id_query.update({ + "$or": [ + {"created_at": {"$gte": start_of_day, "$lte": end_of_day}}, + {"last_updated": {"$gte": start_of_day, "$lte": end_of_day}} + ] + }) + + result = rankings_management_collection.find_one(mix_id_query) + if result: + logging.info(f"通过mix_id找到管理数据: {query['mix_id']}") + return result + + # 如果通过mix_id没找到,或者没有mix_id,尝试其他查询条件 + fallback_query = {k: v for k, v in query.items() if k != 'mix_id'} + + # 添加日期过滤(如果提供了target_date) + if target_date and fallback_query: + if isinstance(target_date, str): + target_date = parse_date_string(target_date) + if target_date: + start_of_day = datetime.combine(target_date, datetime.min.time()) + end_of_day = datetime.combine(target_date, datetime.max.time()) + fallback_query.update({ + "$or": [ + {"created_at": {"$gte": start_of_day, "$lte": end_of_day}}, + {"last_updated": {"$gte": start_of_day, "$lte": end_of_day}} + ] + }) + + if fallback_query: + result = rankings_management_collection.find_one(fallback_query) + if result: + logging.info(f"通过备用查询找到管理数据: {fallback_query}") + return result + + logging.warning(f"未找到匹配的管理数据: {query}") + return None + + except Exception as e: + logging.error(f"查询管理数据时出错: {e}") + return None + def sort_ranking_data(ranking_data, sort_by, sort_order='desc'): """ 对榜单数据进行动态排序 @@ -1086,6 +1161,7 @@ def update_drama_info(): # 准备更新字段 update_fields = {} + field_lock_updates = {} # 检查并添加需要更新的字段 if 'title' in data: @@ -1094,8 +1170,12 @@ def update_drama_info(): update_fields['series_author'] = data['series_author'] if 'Manufacturing_Field' in data: update_fields['Manufacturing_Field'] = data['Manufacturing_Field'] + # 标记制作方字段已被用户锁定 + field_lock_updates['field_lock_status.Manufacturing_Field_locked'] = True if 'Copyright_field' in data: update_fields['Copyright_field'] = data['Copyright_field'] + # 标记版权方字段已被用户锁定 + field_lock_updates['field_lock_status.Copyright_field_locked'] = True if 'desc' in data: update_fields['desc'] = data['desc'] if 'play_vv' in data: @@ -1108,6 +1188,17 @@ def update_drama_info(): if 'timeline_data' in data: update_fields['timeline_data'] = data['timeline_data'] + # 检查分类字段的锁定状态 + if 'Novel_IDs' in data: + update_fields['Novel_IDs'] = data['Novel_IDs'] + field_lock_updates['field_lock_status.Novel_IDs_locked'] = True + if 'Anime_IDs' in data: + update_fields['Anime_IDs'] = data['Anime_IDs'] + field_lock_updates['field_lock_status.Anime_IDs_locked'] = True + if 'Drama_IDs' in data: + update_fields['Drama_IDs'] = data['Drama_IDs'] + field_lock_updates['field_lock_status.Drama_IDs_locked'] = True + if not update_fields: return jsonify({"success": False, "message": "没有提供需要更新的字段"}) @@ -1126,21 +1217,38 @@ def update_drama_info(): }) # 1. 更新Rankings_management数据库 + mgmt_update_data = update_fields.copy() + mgmt_update_data.update(field_lock_updates) # 添加锁定状态更新 + result_mgmt = rankings_management_collection.update_many( {"mix_name": mix_name}, - {"$set": update_fields} + {"$set": mgmt_update_data} ) # 2. 更新Ranking_storage数据库中的data数组 + storage_update_data = {f"data.$.{field}": value for field, value in update_fields.items()} + # 为Ranking_storage也添加锁定状态更新 + for field, value in field_lock_updates.items(): + storage_update_data[f"data.$.{field}"] = value + result_storage = collection.update_many( {"data.mix_name": mix_name}, - {"$set": {f"data.$.{field}": value for field, value in update_fields.items()}} + {"$set": storage_update_data} ) updated_count = result_mgmt.modified_count + result_storage.modified_count matched_count = result_mgmt.matched_count + result_storage.matched_count + # 记录锁定状态更新 + locked_fields = [] + if field_lock_updates: + for field_key in field_lock_updates.keys(): + field_name = field_key.replace('field_lock_status.', '').replace('_locked', '') + locked_fields.append(field_name) + logging.info(f"数据更新: Rankings_management(匹配:{result_mgmt.matched_count}, 修改:{result_mgmt.modified_count}), Ranking_storage(匹配:{result_storage.matched_count}, 修改:{result_storage.modified_count})") + if locked_fields: + logging.info(f"字段锁定状态更新: {', '.join(locked_fields)} 已被标记为用户锁定") # 只要找到了数据就算成功,不管是否有修改 if matched_count > 0: @@ -1443,21 +1551,37 @@ def validate_and_fix_classification_exclusivity(): update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id] update_fields['Drama_IDs'] = drama_ids - # 更新Rankings_management - rankings_management_collection.update_one( - {"mix_name": mix_name}, - {"$set": update_fields} - ) + # 更新Rankings_management - 优先使用mix_id + if mix_id: + rankings_management_collection.update_one( + {"mix_id": mix_id}, + {"$set": update_fields} + ) + else: + rankings_management_collection.update_one( + {"mix_name": mix_name}, + {"$set": update_fields} + ) - # 更新Ranking_storage - collection.update_many( - {"data.mix_name": mix_name}, - {"$set": { - f"data.$.Novel_IDs": update_fields['Novel_IDs'], - f"data.$.Anime_IDs": update_fields['Anime_IDs'], - f"data.$.Drama_IDs": update_fields['Drama_IDs'] - }} - ) + # 更新Ranking_storage - 优先使用mix_id + if mix_id: + collection.update_many( + {"data.mix_id": mix_id}, + {"$set": { + f"data.$.Novel_IDs": update_fields['Novel_IDs'], + f"data.$.Anime_IDs": update_fields['Anime_IDs'], + f"data.$.Drama_IDs": update_fields['Drama_IDs'] + }} + ) + else: + collection.update_many( + {"data.mix_name": mix_name}, + {"$set": { + f"data.$.Novel_IDs": update_fields['Novel_IDs'], + f"data.$.Anime_IDs": update_fields['Anime_IDs'], + f"data.$.Drama_IDs": update_fields['Drama_IDs'] + }} + ) fixed_count += 1 logging.info(f"修复分类冲突: {mix_name} 保留为 {keep_classification} 分类") @@ -1544,45 +1668,29 @@ def sync_ranking_storage_fields(target_date=None, force_update=False, max_retrie logging.warning(f"跳过空的或无效的mix_name记录: {data_item.get('_id', 'unknown')}") continue # 不添加到updated_data_array,直接跳过 - # 🔧 增强逻辑:如果mix_name为空,尝试通过其他方式找到对应数据 + # 🔧 优化逻辑:优先使用mix_id进行查询,提高准确性 source_data = None + mix_id = data_item.get('mix_id') - # 构建日期查询条件 - 查找当天的数据 - start_of_day = datetime.combine(target_date_obj, datetime.min.time()) - end_of_day = datetime.combine(target_date_obj, datetime.max.time()) - date_query = { - "$or": [ - {"created_at": {"$gte": start_of_day, "$lte": end_of_day}}, - {"last_updated": {"$gte": start_of_day, "$lte": end_of_day}} - ] - } - + # 使用通用查询函数,优先mix_id查询 + query_conditions = {} + if mix_id: + query_conditions['mix_id'] = mix_id if mix_name: - # 优先使用mix_name查找 - 从Rankings_management获取数据,添加日期过滤 - query = {"mix_name": mix_name} - query.update(date_query) - source_data = rankings_management_collection.find_one(query) + query_conditions['mix_name'] = mix_name - # 如果通过mix_name没找到数据,或者mix_name为空,尝试其他匹配方式 + # 使用find_management_data函数进行查询 + if query_conditions: + source_data = find_management_data(query_conditions, target_date) + + # 如果还是没找到,尝试通过title匹配 if not source_data: - # 方法1:通过mix_id匹配(如果有的话) - mix_id = data_item.get('mix_id') - if mix_id: - query = {"mix_id": mix_id} - query.update(date_query) - source_data = rankings_management_collection.find_one(query) + title = data_item.get('title') + if title and title.strip(): + title_query = {"mix_name": title.strip()} + source_data = find_management_data(title_query, target_date) if source_data: - logging.info(f"通过mix_id找到数据: {mix_id} -> {source_data.get('mix_name', 'N/A')}") - - # 方法2:如果还是没找到,尝试通过title匹配 - if not source_data: - title = data_item.get('title') - if title and title.strip(): - query = {"mix_name": title.strip()} - query.update(date_query) - source_data = rankings_management_collection.find_one(query) - if source_data: - logging.info(f"通过title找到数据: {title} -> {source_data.get('mix_name', 'N/A')}") + logging.info(f"通过title找到数据: {title} -> {source_data.get('mix_name', 'N/A')}") # 如果找到了源数据,更新mix_name(如果原来为空的话) if source_data and not mix_name: diff --git a/frontend/vite.config.js b/frontend/vite.config.js index 63d1997..d685998 100644 --- a/frontend/vite.config.js +++ b/frontend/vite.config.js @@ -13,4 +13,14 @@ export default defineConfig({ '@': fileURLToPath(new URL('./src', import.meta.url)) }, }, + server: { + port: 5174, + proxy: { + '/api': { + target: 'http://localhost:5001', + changeOrigin: true, + secure: false + } + } + } })