diff --git a/backend/Timer_worker.py b/backend/Timer_worker.py index 9b9641b..ec5c0c5 100644 --- a/backend/Timer_worker.py +++ b/backend/Timer_worker.py @@ -277,6 +277,9 @@ class DouyinAutoScheduler: "data": [] } + # 获取Rankings_management集合用于补充详细信息 + rankings_management_collection = db['Rankings_management'] + # 生成排序后的榜单数据 for i, item in enumerate(videos_with_growth, 1): video = item["video"] @@ -290,16 +293,46 @@ class DouyinAutoScheduler: yesterday_rank = item["yesterday_data"].get("rank", 0) rank_change = yesterday_rank - i + # 🔍 从Rankings_management获取详细信息 + management_data = rankings_management_collection.find_one({"mix_name": mix_name}) + ranking_item = { + # 🎯 核心榜单字段 "rank": i, "title": mix_name, + "mix_name": mix_name, # 确保包含mix_name字段用于同步 "play_vv": current_play_vv, - "author": video.get("author", ""), + "series_author": video.get("series_author", ""), "video_id": video_id, "video_url": video.get("video_url", ""), "cover_image_url": video.get("cover_image_url", ""), "playcount_str": video.get("playcount", ""), - # 时间轴对比数据 + + # 📋 从Rankings_management获取的详细字段 + "batch_id": management_data.get("batch_id", "") if management_data else "", + "batch_time": management_data.get("batch_time") if management_data else None, + "item_sequence": management_data.get("item_sequence", 0) if management_data else 0, + "mix_id": management_data.get("mix_id", "") if management_data else "", + "playcount": management_data.get("playcount", "") if management_data else "", + "request_id": management_data.get("request_id", "") if management_data else "", + "cover_image_url_original": management_data.get("cover_image_url_original", "") if management_data else "", + "cover_upload_success": management_data.get("cover_upload_success", True) if management_data else True, + "cover_backup_urls": management_data.get("cover_backup_urls", []) if management_data else [], + "desc": management_data.get("desc", "") if management_data else "", + "updated_to_episode": management_data.get("updated_to_episode", 0) if management_data else 0, + "episode_video_ids": management_data.get("episode_video_ids", []) if management_data else [], + "episode_details": management_data.get("episode_details", []) if management_data else [], + "data_status": management_data.get("data_status", "") if management_data else "", + "realtime_saved": management_data.get("realtime_saved", True) if management_data else True, + "created_at": management_data.get("created_at") if management_data else None, + "last_updated": management_data.get("last_updated") if management_data else None, + "Manufacturing_Field": management_data.get("Manufacturing_Field", "") if management_data else "", + "Copyright_field": management_data.get("Copyright_field", "") if management_data else "", + "Novel_IDs": management_data.get("Novel_IDs", []) if management_data else [], + "Anime_IDs": management_data.get("Anime_IDs", []) if management_data else [], + "Drama_IDs": management_data.get("Drama_IDs", []) if management_data else [], + + # 📊 时间轴对比数据(重要:包含播放量差值) "timeline_data": { "is_new": item["is_new"], "rank_change": rank_change, @@ -330,6 +363,29 @@ class DouyinAutoScheduler: logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)") logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}") + # 📊 检查数据完整性:统计从Rankings_management成功获取详细信息的项目数量 + total_items = len(comprehensive_ranking["data"]) + items_with_management_data = 0 + items_with_manufacturing = 0 + items_with_copyright = 0 + + for item in comprehensive_ranking["data"]: + # 检查是否从Rankings_management获取到了数据 + if item.get("batch_id") or item.get("desc") or item.get("Manufacturing_Field") or item.get("Copyright_field"): + items_with_management_data += 1 + if item.get("Manufacturing_Field"): + items_with_manufacturing += 1 + if item.get("Copyright_field"): + items_with_copyright += 1 + + print(f"📊 数据完整性统计:") + print(f" 总项目数: {total_items}") + print(f" 从Rankings_management获取到详细信息: {items_with_management_data}") + print(f" 包含Manufacturing_Field: {items_with_manufacturing}") + print(f" 包含Copyright_field: {items_with_copyright}") + + logging.info(f"📊 数据完整性: 总{total_items}项,获取详细信息{items_with_management_data}项,Manufacturing_Field: {items_with_manufacturing},Copyright_field: {items_with_copyright}") + # 统计信息 new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"]) print(f"✅ 时间轴对比榜单生成成功") @@ -358,13 +414,165 @@ class DouyinAutoScheduler: import traceback logging.error(f"详细错误信息: {traceback.format_exc()}") + def check_and_sync_missing_fields(self): + """实时检查并同步当天缺失字段""" + try: + from database import db + + # 只检查当天的数据 + today = date.today() + today_str = today.strftime('%Y-%m-%d') + + # 首先检查 Rankings_management 是否有当天的数据 + rankings_management_collection = db['Rankings_management'] + management_count = rankings_management_collection.count_documents({}) + + if management_count == 0: + # Rankings_management 没有数据,说明还没有抓取,直接返回 + return + + rankings_collection = db['Ranking_storage'] + key_fields = ['Manufacturing_Field', 'Copyright_field', 'desc', 'series_author'] + + # 检查今天是否有缺失字段的数据 + missing_conditions = [] + for field in key_fields: + missing_conditions.extend([ + {field: {"$exists": False}}, + {field: None}, + {field: ""} + ]) + + today_missing_count = rankings_collection.count_documents({ + "date": today_str, + "$or": missing_conditions + }) + + # 如果今天没有缺失数据,静默返回 + if today_missing_count == 0: + return + + logging.info(f"🔍 检测到今天有 {today_missing_count} 条缺失字段,Rankings_management有 {management_count} 条数据,开始实时同步...") + + # 只处理当天的数据 + dates_to_check = [today_str] + + total_missing = 0 + total_synced = 0 + + for check_date in dates_to_check: + # 查询该日期缺失字段的数据 + rankings_collection = db['Ranking_storage'] + + # 检查多个关键字段(包括新增的分类字段) + key_fields = ['Manufacturing_Field', 'Copyright_field', 'desc', 'series_author', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs'] + missing_conditions = [] + + for field in key_fields: + missing_conditions.extend([ + {field: {"$exists": False}}, + {field: None}, + {field: ""} + ]) + + missing_query = { + "date": check_date, + "$or": missing_conditions + } + + missing_count = rankings_collection.count_documents(missing_query) + + # 详细统计每个字段的缺失情况 + field_stats = {} + total_items = rankings_collection.count_documents({"date": check_date}) + + for field in key_fields: + missing_field_count = rankings_collection.count_documents({ + "date": check_date, + "$or": [ + {field: {"$exists": False}}, + {field: None}, + {field: ""} + ] + }) + field_stats[field] = { + "missing": missing_field_count, + "completion_rate": ((total_items - missing_field_count) / total_items * 100) if total_items > 0 else 0 + } + + if missing_count > 0: + logging.info(f"📅 今日({check_date}): 发现 {missing_count} 条记录缺失字段(总计 {total_items} 条)") + + # 输出详细的字段统计 + for field, stats in field_stats.items(): + if stats["missing"] > 0: + logging.info(f" - {field}: 缺失 {stats['missing']} 条 ({stats['completion_rate']:.1f}% 完整)") + + total_missing += missing_count + + # 尝试同步 + try: + from routers.rank_api_routes import sync_ranking_storage_fields + + # 使用改进的重试机制 + sync_result = sync_ranking_storage_fields( + target_date=check_date, + force_update=False, + max_retries=2, # 定期检查时重试2次 + retry_delay=15 # 15秒重试间隔 + ) + + if sync_result.get("success", False): + stats = sync_result.get("stats", {}) + synced = stats.get("updated_items", 0) + retry_count = stats.get("retry_count", 0) + pending_final = stats.get("pending_items_final", 0) + + total_synced += synced + if synced > 0: + logging.info(f"✅ 今日({check_date}): 成功同步 {synced} 条记录") + + if retry_count > 0: + logging.info(f"🔄 今日({check_date}): 使用了 {retry_count} 次重试") + + if pending_final > 0: + logging.warning(f"⚠️ 今日({check_date}): {pending_final} 条记录在 Rankings_management 中仍未找到") + else: + logging.warning(f"⚠️ 今日({check_date}): 同步失败 - {sync_result.get('message', '')}") + + except Exception as sync_error: + logging.error(f"💥 今日({check_date}): 同步过程出错 - {sync_error}") + else: + if total_items > 0: + logging.info(f"📅 {check_date}: 所有字段完整(总计 {total_items} 条记录)") + # 显示完整性统计 + for field, stats in field_stats.items(): + logging.info(f" - {field}: {stats['completion_rate']:.1f}% 完整") + else: + logging.info(f"📅 {check_date}: 无数据") + + if total_missing > 0: + logging.info(f"🔍 当天同步完成:发现 {total_missing} 条缺失记录,成功同步 {total_synced} 条") + print(f"🔍 当天字段同步:发现 {total_missing} 条缺失,同步 {total_synced} 条") + else: + # 当天没有缺失数据时,不输出日志(静默模式) + pass + + except Exception as e: + logging.error(f"💥 检查缺失字段时发生异常: {e}") + import traceback + logging.error(f"详细错误信息: {traceback.format_exc()}") def setup_schedule(self): """设置定时任务""" # 每小时的整点执行抖音播放量抓取 schedule.every().hour.at(":00").do(self.run_douyin_scraper) + + # 每1分钟检查一次缺失字段并尝试同步(实时同步) + schedule.every(1).minutes.do(self.check_and_sync_missing_fields) logging.info(f"⏰ 定时器已设置:每小时整点执行抖音播放量抓取") + logging.info(f"⏰ 定时器已设置:每1分钟检查缺失字段并同步(实时模式)") def show_next_run(self): """显示下次执行时间""" diff --git a/backend/handlers/Rankings/episode_video_ids/video_ids_7486423445933951028.json b/backend/handlers/Rankings/episode_video_ids/video_ids_7486423445933951028.json index 5291f84..619bea2 100644 --- a/backend/handlers/Rankings/episode_video_ids/video_ids_7486423445933951028.json +++ b/backend/handlers/Rankings/episode_video_ids/video_ids_7486423445933951028.json @@ -143,9 +143,13 @@ { "video_id": "7558378239337467174", "episode_num": 0 + }, + { + "video_id": "7567050545257516331", + "episode_num": 0 } ], - "total_count": 36, - "last_update": "2025-10-22T09:55:32.073567", + "total_count": 37, + "last_update": "2025-10-31T09:50:18.533027", "mix_name": "末世系列" } \ No newline at end of file diff --git a/backend/handlers/Rankings/rank_data_scraper.py b/backend/handlers/Rankings/rank_data_scraper.py index 6c7bf68..f0f723a 100644 --- a/backend/handlers/Rankings/rank_data_scraper.py +++ b/backend/handlers/Rankings/rank_data_scraper.py @@ -109,25 +109,41 @@ class DouyinPlayVVScraper: # 根据运行模式选择集合 is_timer_mode = os.environ.get('TIMER_MODE') == '1' - mongo_collection = 'Ranking_storage_list' if is_timer_mode else 'Rankings_list' + mongo_collection = 'Ranking_storage_list' if is_timer_mode else 'Rankings_management' self.collection = self.db[mongo_collection] - logging.info(f'MongoDB连接成功,使用数据库: {self.db.name},集合: {mongo_collection}') + # 新增:设置Rankings_management集合(每天替换的数据库) + self.management_collection = self.db['Rankings_management'] + + logging.info(f'MongoDB连接成功,使用数据库: {self.db.name}') + logging.info(f'主集合: {mongo_collection}(只增不删)') + logging.info(f'管理集合: Rankings_management(每天替换)') logging.info(f'当前运行模式: {"定时器模式" if is_timer_mode else "普通模式"}') except Exception as e: - logging.error(f'MongoDB连接失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': 'MongoDB连接设置' + } + logging.error(f'MongoDB连接失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.error(f'详细错误信息: {error_details["traceback"]}') + logging.error(f'错误上下文: {error_details["context"]}') self.db = None self.collection = None + self.management_collection = None def _load_image_cache(self): """从数据库加载已存在的图片ID到TOS链接的映射""" - if self.collection is None: + target_collection = self.collection # 使用根据模式选择的集合 + if target_collection is None: return try: # 查询所有有封面图片的记录 - cursor = self.collection.find( + cursor = target_collection.find( { 'cover_image_url_original': {'$exists': True, '$ne': ''}, 'cover_image_url': {'$exists': True, '$ne': ''} @@ -150,7 +166,16 @@ class DouyinPlayVVScraper: logging.info(f'从数据库加载图片缓存: {cache_count} 个图片映射') except Exception as e: - logging.error(f'加载图片缓存失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': '从数据库加载图片缓存' + } + logging.error(f'加载图片缓存失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.error(f'详细错误信息: {error_details["traceback"]}') + logging.error(f'错误上下文: {error_details["context"]}') def _cleanup_old_profiles(self): """清理超过一天的旧临时Chrome配置文件""" @@ -177,7 +202,16 @@ class DouyinPlayVVScraper: # 如果无法解析时间戳,跳过 continue except Exception as e: - logging.warning(f'清理旧配置文件时出错: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': '清理超过一天的旧临时Chrome配置文件' + } + logging.warning(f'清理旧配置文件时出错: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') def _cleanup_chrome_processes(self): """清理可能占用配置文件的Chrome进程""" @@ -401,7 +435,16 @@ class DouyinPlayVVScraper: else: logging.info(f'候选路径不存在: {p}') except Exception as e: - logging.warning(f'尝试使用 {p} 启动失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'尝试使用ChromeDriver路径: {p}' + } + logging.warning(f'尝试使用 {p} 启动失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') if not driver_ready: # 最终回退:使用webdriver-manager(可能需要网络) @@ -411,7 +454,17 @@ class DouyinPlayVVScraper: driver_ready = True logging.info('使用webdriver-manager成功启动ChromeDriver') except Exception as e: - raise RuntimeError('未能启动ChromeDriver。请手动下载匹配版本的chromedriver到项目根目录或PATH,或检查网络以允许webdriver-manager下载。错误: ' + str(e)) + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': '使用webdriver-manager启动ChromeDriver' + } + logging.error(f'webdriver-manager启动失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.error(f'详细错误信息: {error_details["traceback"]}') + logging.error(f'错误上下文: {error_details["context"]}') + raise RuntimeError(f'未能启动ChromeDriver。请手动下载匹配版本的chromedriver到项目根目录或PATH,或检查网络以允许webdriver-manager下载。错误类型: {error_details["error_type"]}, 错误信息: {error_details["error_message"]}') # 反检测 try: @@ -452,7 +505,16 @@ class DouyinPlayVVScraper: time.sleep(3) # 等待页面加载 logging.info("自动模式:假设登录成功,继续执行...") except Exception as e: - logging.warning(f"自动模式导航失败: {e},继续执行...") + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'自动模式导航到起始URL: {self.start_url}' + } + logging.warning(f"自动模式导航失败: {error_details['error_type']} - {error_details['error_message']},继续执行...") + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') return logging.info("进入手动登录确认循环...") @@ -746,10 +808,12 @@ class DouyinPlayVVScraper: elif isinstance(pic, str): cover_image_url = pic - # 提取新增的三个字段 + # 提取新增的五个字段 series_author = "" desc = "" updated_to_episode = 0 + manufacturing_field = "" # 承制信息 + copyright_field = "" # 版权信息 # 提取合集作者/影视工作室 if 'author' in obj: @@ -981,7 +1045,16 @@ class DouyinPlayVVScraper: try: logs = self.driver.get_log('performance') except Exception as e: - logging.warning(f'获取性能日志失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': '获取Chrome性能日志' + } + logging.warning(f'获取性能日志失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') time.sleep(1) continue @@ -1104,12 +1177,13 @@ class DouyinPlayVVScraper: def update_ranks_for_batch(self): """为当前批次的数据更新排名""" - if self.collection is None or not self.saved_items: + target_collection = self.collection # 使用根据模式选择的集合 + if target_collection is None or not self.saved_items: return try: # 获取当前批次的所有数据,按播放量排序 - cursor = self.collection.find( + cursor = target_collection.find( {'batch_id': self.batch_id}, {'_id': 1, 'play_vv': 1, 'mix_name': 1} ).sort('play_vv', -1) @@ -1132,7 +1206,7 @@ class DouyinPlayVVScraper: ) if bulk_operations: - result = self.collection.bulk_write(bulk_operations) + result = target_collection.bulk_write(bulk_operations) logging.info(f'[实时保存] 成功更新 {result.modified_count} 个合集的排名') # 输出排名统计 @@ -1354,7 +1428,8 @@ class DouyinPlayVVScraper: logging.info(f'开始获取合集 {mix_name} 的视频详细互动数据') video_details_list = self.get_collection_video_details( episode_video_ids=episode_video_ids, - mix_name=mix_name + mix_name=mix_name, + mix_id=mix_id ) # 构建每集的详细信息,使用获取到的真实数据 @@ -1441,7 +1516,9 @@ class DouyinPlayVVScraper: 'desc': item.get('desc', ''), # 合集描述 'updated_to_episode': item.get('updated_to_episode', 0), # 合集总集数 'episode_video_ids': episode_video_ids, # 每一集的视频ID列表 - 'episode_details': episode_details # 每集的详细信息 + 'episode_details': episode_details, # 每集的详细信息 + 'Manufacturing_Field': item.get('Manufacturing_Field', ''), # 承制信息 + 'Copyright_field': item.get('Copyright_field', ''), # 版权信息 } documents.append(doc) @@ -1450,8 +1527,9 @@ class DouyinPlayVVScraper: for i, doc in enumerate(documents, 1): doc['rank'] = i - # 批量插入 - result = self.collection.insert_many(documents) + # 批量插入到目标集合(根据模式选择) + target_collection = self.collection # 使用根据模式选择的集合 + result = target_collection.insert_many(documents) logging.info(f'成功保存 {len(result.inserted_ids)} 条记录到MongoDB') # 输出统计信息 @@ -1459,7 +1537,7 @@ class DouyinPlayVVScraper: max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0 logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}') - logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url_original, cover_image_url, cover_upload_success, series_author, desc, updated_to_episode') + logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url_original, cover_image_url, cover_upload_success, series_author, Manufacturing_Field, Copyright_field, desc, updated_to_episode') # 统计封面图片处理情况 cover_count = sum(1 for doc in documents if doc.get('cover_image_url')) @@ -1485,11 +1563,12 @@ class DouyinPlayVVScraper: return None try: - # 生成唯一标识用于去重 - item_key = f"{item_data.get('mix_id', '')}_{item_data.get('play_vv', 0)}" + # 生成唯一标识用于去重(只使用mix_id,不包含播放量) + mix_id = item_data.get('mix_id', '') + item_key = mix_id if item_key in self.saved_items: - logging.warning(f'[立即保存] 合集数据已存在,跳过保存: {item_data.get("mix_name", "")}') + logging.warning(f'[立即保存] 短剧已存在,跳过重复保存: {item_data.get("mix_name", "")} (mix_id: {mix_id})') return None # 增加序号 @@ -1546,6 +1625,8 @@ class DouyinPlayVVScraper: 'cover_upload_success': upload_success, 'cover_backup_urls': item_data.get('cover_backup_urls', []), 'series_author': item_data.get('series_author', ''), + 'Manufacturing_Field': item_data.get('Manufacturing_Field', ''), + 'Copyright_field': item_data.get('Copyright_field', ''), 'desc': item_data.get('desc', ''), 'updated_to_episode': current_episode_count, 'episode_video_ids': [], # 稍后更新 @@ -1556,14 +1637,158 @@ class DouyinPlayVVScraper: 'last_updated': datetime.now() } - # 插入文档 - result = self.collection.insert_one(doc) - document_id = result.inserted_id + # 根据运行模式选择数据库集合 + target_collection = self.collection # 使用根据模式选择的集合 + document_id = None + + # 保存到目标数据库(根据模式:定时器模式->Ranking_storage_list,普通模式->Rankings_management) + if target_collection is not None: + try: + # 为目标数据库准备文档数据 + target_doc = doc.copy() + target_doc['last_updated'] = datetime.now() + + # 检查是否已存在该短剧的记录 + existing_doc = target_collection.find_one({'mix_id': mix_id}) + + # 准备更新字段(不包含锁定字段,锁定字段将在后面单独处理) + set_fields = { + # 按照用户指定的字段顺序设置 + 'batch_id': target_doc.get('batch_id', ''), + 'batch_time': target_doc.get('batch_time', datetime.now()), + 'item_sequence': target_doc.get('item_sequence', 0), + 'mix_name': target_doc.get('mix_name', ''), + 'mix_id': mix_id, + 'video_url': target_doc.get('video_url', ''), + 'playcount': target_doc.get('playcount', ''), + 'play_vv': target_doc.get('play_vv', 0), + 'request_id': target_doc.get('request_id', ''), + 'rank': target_doc.get('rank', 0), + 'cover_image_url_original': target_doc.get('cover_image_url_original', ''), + 'cover_image_url': target_doc.get('cover_image_url', ''), + 'cover_upload_success': target_doc.get('cover_upload_success', True), + 'cover_backup_urls': target_doc.get('cover_backup_urls', []), + 'series_author': target_doc.get('series_author', ''), + 'desc': target_doc.get('desc', ''), + 'updated_to_episode': target_doc.get('updated_to_episode', 0), + 'episode_video_ids': target_doc.get('episode_video_ids', []), + 'episode_details': target_doc.get('episode_details', []), + 'data_status': target_doc.get('data_status', ''), + 'realtime_saved': target_doc.get('realtime_saved', True), + 'created_at': target_doc.get('created_at', datetime.now()), + 'last_updated': target_doc['last_updated'], + # 新增:内容分类字段(存储短剧ID列表) + 'Novel_IDs': target_doc.get('Novel_IDs', []), + 'Anime_IDs': target_doc.get('Anime_IDs', []), + 'Drama_IDs': target_doc.get('Drama_IDs', []) + } + + # 锁定字段保护逻辑:Manufacturing_Field 和 Copyright_field + # 规则:如果现有记录中这些字段有值,则跳过更新(保持原值) + # 如果现有记录中这些字段为空,且新数据有值,则更新 + # 如果是新记录,则使用新数据的值 + if existing_doc: + # 记录已存在,检查锁定字段保护 + existing_manufacturing = existing_doc.get('Manufacturing_Field', '') + existing_copyright = existing_doc.get('Copyright_field', '') + existing_novel_ids = existing_doc.get('Novel_IDs', []) + existing_anime_ids = existing_doc.get('Anime_IDs', []) + existing_drama_ids = existing_doc.get('Drama_IDs', []) + + new_manufacturing = target_doc.get('Manufacturing_Field', '') + new_copyright = target_doc.get('Copyright_field', '') + new_novel_ids = target_doc.get('Novel_IDs', []) + new_anime_ids = target_doc.get('Anime_IDs', []) + new_drama_ids = target_doc.get('Drama_IDs', []) + + # Manufacturing_Field 保护逻辑 + if existing_manufacturing: + # 现有字段有值,跳过更新(不添加到set_fields中) + logging.info(f'[锁定字段] 跳过Manufacturing_Field更新: {mix_name} -> 保持现有值 "{existing_manufacturing}"') + elif new_manufacturing: + # 现有字段为空,且新数据有值,则更新 + set_fields['Manufacturing_Field'] = new_manufacturing + logging.info(f'[锁定字段] 更新Manufacturing_Field: {mix_name} -> "{new_manufacturing}"') + # 如果现有为空且新数据也为空,则不设置该字段(保持为空) + + # Copyright_field 保护逻辑 + if existing_copyright: + # 现有字段有值,跳过更新(不添加到set_fields中) + logging.info(f'[锁定字段] 跳过Copyright_field更新: {mix_name} -> 保持现有值 "{existing_copyright}"') + elif new_copyright: + # 现有字段为空,且新数据有值,则更新 + set_fields['Copyright_field'] = new_copyright + logging.info(f'[锁定字段] 更新Copyright_field: {mix_name} -> "{new_copyright}"') + # 如果现有为空且新数据也为空,则不设置该字段(保持为空) + + # Novel_IDs 保护逻辑 + if existing_novel_ids and len(existing_novel_ids) > 0: + # 现有字段有值,跳过更新(不添加到set_fields中) + logging.info(f'[锁定字段] 跳过Novel_IDs更新: {mix_name} -> 保持现有值 {existing_novel_ids}') + elif new_novel_ids and len(new_novel_ids) > 0: + # 现有字段为空,且新数据有值,则更新 + set_fields['Novel_IDs'] = new_novel_ids + logging.info(f'[锁定字段] 更新Novel_IDs: {mix_name} -> {new_novel_ids}') + # 如果现有为空且新数据也为空,则不设置该字段(保持为空) + + # Anime_IDs 保护逻辑 + if existing_anime_ids and len(existing_anime_ids) > 0: + # 现有字段有值,跳过更新(不添加到set_fields中) + logging.info(f'[锁定字段] 跳过Anime_IDs更新: {mix_name} -> 保持现有值 {existing_anime_ids}') + elif new_anime_ids and len(new_anime_ids) > 0: + # 现有字段为空,且新数据有值,则更新 + set_fields['Anime_IDs'] = new_anime_ids + logging.info(f'[锁定字段] 更新Anime_IDs: {mix_name} -> {new_anime_ids}') + # 如果现有为空且新数据也为空,则不设置该字段(保持为空) + + # Drama_IDs 保护逻辑 + if existing_drama_ids and len(existing_drama_ids) > 0: + # 现有字段有值,跳过更新(不添加到set_fields中) + logging.info(f'[锁定字段] 跳过Drama_IDs更新: {mix_name} -> 保持现有值 {existing_drama_ids}') + elif new_drama_ids and len(new_drama_ids) > 0: + # 现有字段为空,且新数据有值,则更新 + set_fields['Drama_IDs'] = new_drama_ids + logging.info(f'[锁定字段] 更新Drama_IDs: {mix_name} -> {new_drama_ids}') + # 如果现有为空且新数据也为空,则不设置该字段(保持为空) + + else: + # 新记录,使用新数据的值(可能为空) + set_fields['Manufacturing_Field'] = target_doc.get('Manufacturing_Field', '') + set_fields['Copyright_field'] = target_doc.get('Copyright_field', '') + set_fields['Novel_IDs'] = target_doc.get('Novel_IDs', []) + set_fields['Anime_IDs'] = target_doc.get('Anime_IDs', []) + set_fields['Drama_IDs'] = target_doc.get('Drama_IDs', []) + logging.info(f'[锁定字段] 新记录,设置初始锁定字段: {mix_name}') + + # 使用upsert操作:如果存在则更新,不存在则插入 + upsert_result = target_collection.update_one( + {'mix_id': mix_id}, # 查询条件 + { + '$set': set_fields, + '$setOnInsert': { + # 只在插入时设置的字段(如果字段已在$set中,则不需要在这里重复) + } + }, + upsert=True # 如果不存在则插入 + ) + + if upsert_result.upserted_id: + # 新插入的文档 + document_id = upsert_result.upserted_id + logging.info(f'[数据保存] ✅ 新短剧添加: {mix_name} - 文档ID: {document_id}') + else: + # 更新的现有文档 + existing_doc = target_collection.find_one({'mix_id': mix_id}, {'_id': 1}) + document_id = existing_doc['_id'] if existing_doc else None + logging.info(f'[数据保存] 🔄 已有短剧更新: {mix_name} - 文档ID: {document_id}') + + except Exception as e: + logging.error(f'[数据保存] 目标数据库操作失败: {mix_name} - 错误: {e}') # 记录已保存的项目 self.saved_items.add(item_key) - logging.info(f'[立即保存] ✅ 成功保存合集基础信息: {mix_name} (播放量: {item_data.get("play_vv", 0):,}) - 文档ID: {document_id}') + logging.info(f'[数据保存] 🎯 合集基础信息保存完成: {mix_name} (播放量: {item_data.get("play_vv", 0):,})') return document_id @@ -1573,7 +1798,8 @@ class DouyinPlayVVScraper: def update_collection_video_ids(self, document_id, mix_id: str, mix_name: str, current_episode_count: int): """更新合集的视频ID列表(第二阶段更新)""" - if not self.realtime_save_enabled or self.collection is None or not document_id: + target_collection = self.collection # 使用根据模式选择的集合 + if not self.realtime_save_enabled or target_collection is None or not document_id: return False try: @@ -1587,23 +1813,29 @@ class DouyinPlayVVScraper: ) if episode_video_ids: - # 更新数据库中的视频ID列表 - update_result = self.collection.update_one( - {'_id': document_id}, - { - '$set': { - 'episode_video_ids': episode_video_ids, - 'data_status': 'video_ids_updated', - 'last_updated': datetime.now() - } + # 管理数据库更新逻辑 + update_data = { + '$set': { + 'episode_video_ids': episode_video_ids, + 'data_status': 'video_ids_updated', + 'last_updated': datetime.now() } - ) + } - if update_result.modified_count > 0: - logging.info(f'[增量更新] ✅ 成功更新视频ID列表: {mix_name} - 共 {len(episode_video_ids)} 个视频') - return episode_video_ids - else: - logging.warning(f'[增量更新] 更新视频ID列表失败: {mix_name}') + # 更新目标数据库 + try: + # 根据mix_id查找目标数据库中的文档 + update_result = target_collection.update_one( + {'mix_id': mix_id}, + update_data + ) + if update_result.modified_count > 0: + logging.info(f'[数据更新] ✅ 视频ID列表更新完成: {mix_name} - 共 {len(episode_video_ids)} 个视频') + return episode_video_ids + else: + logging.warning(f'[数据更新] 视频ID列表更新失败: {mix_name}') + except Exception as e: + logging.error(f'[数据更新] 视频ID更新失败: {mix_name} - 错误: {e}') else: logging.warning(f'[增量更新] 未获取到视频ID: {mix_name}') @@ -1615,7 +1847,15 @@ class DouyinPlayVVScraper: def update_single_video_details(self, document_id, episode_number: int, video_id: str, video_details: dict, mix_name: str): """更新单个视频的详细数据(第三阶段增量更新)""" - if not self.realtime_save_enabled or self.collection is None or not document_id: + target_collection = self.collection # 使用根据模式选择的集合 + if not self.realtime_save_enabled or target_collection is None or not document_id: + return False + + # 确保 episode_number 是整数类型 + try: + episode_number = int(episode_number) + except (ValueError, TypeError): + logging.error(f'update_single_video_details: episode_number 类型转换失败: {episode_number}, 类型: {type(episode_number)}') return False try: @@ -1633,31 +1873,50 @@ class DouyinPlayVVScraper: 'data_status': 'completed' } - # 更新数据库中对应集数的详细信息 - update_result = self.collection.update_one( - {'_id': document_id}, - { - '$set': { - f'episode_details.{episode_number - 1}': episode_info, - 'last_updated': datetime.now() - } + # 双数据库更新逻辑 + update_data = { + '$set': { + f'episode_details.{episode_number - 1}': episode_info, + 'last_updated': datetime.now() } - ) + } - if update_result.modified_count > 0: - logging.info(f'[增量更新] ✅ 成功更新第 {episode_number} 集详细数据: {mix_name} - 点赞: {video_details.get("likes", 0):,}, 评论: {len(video_details.get("comments", []))}') - return True + # 更新目标数据库 + if target_collection is not None: + try: + # 直接使用document_id查找目标数据库中的文档 + update_result = target_collection.update_one( + {'_id': document_id}, + update_data + ) + if update_result.modified_count > 0: + logging.info(f'[数据更新] ✅ 第 {episode_number} 集详细数据更新完成: {mix_name} - 点赞: {video_details.get("likes", 0):,}, 评论: {len(video_details.get("comments", []))}') + return True + else: + logging.warning(f'[数据更新] 第 {episode_number} 集详细数据更新失败: {mix_name}') + return False + except Exception as e: + logging.error(f'[数据更新] 第 {episode_number} 集详细数据更新失败: {mix_name} - 错误: {e}') + return False else: - logging.warning(f'[增量更新] 更新第 {episode_number} 集详细数据失败: {mix_name}') + logging.warning(f'[数据更新] 目标数据库第 {episode_number} 集详细数据更新失败: {mix_name}') return False except Exception as e: logging.error(f'[增量更新] 更新第 {episode_number} 集详细数据失败: {mix_name} - 错误: {e}') return False - def update_video_comments_realtime(self, document_id, episode_number: int, new_comments: list = None, mix_name: str = '', interaction_data: dict = None): + def update_video_comments_realtime(self, document_id, episode_number: int, new_comments: list = None, mix_name: str = '', mix_id: str = '', interaction_data: dict = None): """实时更新视频评论和互动数据(第四阶段实时更新)""" - if not self.realtime_save_enabled or self.collection is None or not document_id: + target_collection = self.collection # 使用根据模式选择的集合 + if not self.realtime_save_enabled or target_collection is None or not document_id: + return False + + # 确保 episode_number 是整数类型 + try: + episode_number = int(episode_number) + except (ValueError, TypeError): + logging.error(f'episode_number 类型转换失败: {episode_number}, 类型: {type(episode_number)}') return False # 检查是否有数据需要更新 @@ -1695,36 +1954,53 @@ class DouyinPlayVVScraper: update_operations['$set'] = set_fields - # 执行更新 - update_result = self.collection.update_one( - {'_id': document_id}, - update_operations - ) - - if update_result.modified_count > 0: - # 构建日志信息 - log_parts = [] - if new_comments: - log_parts.append(f"追加 {len(new_comments)} 条评论") - if interaction_data: - interaction_summary = [] - if 'likes' in interaction_data: - interaction_summary.append(f"点赞={interaction_data.get('likes_formatted', interaction_data['likes'])}") - if 'shares' in interaction_data: - interaction_summary.append(f"分享={interaction_data.get('shares_formatted', interaction_data['shares'])}") - if 'favorites' in interaction_data: - interaction_summary.append(f"收藏={interaction_data.get('favorites_formatted', interaction_data['favorites'])}") - if interaction_summary: - log_parts.append(f"更新互动数据({', '.join(interaction_summary)})") - - logging.info(f'[实时更新] ✅ 成功{", ".join(log_parts)}: {mix_name} 第 {episode_number} 集') - return True + # 目标数据库更新逻辑 + if target_collection is not None: + try: + # 直接使用document_id查找目标数据库中的文档 + update_result = target_collection.update_one( + {'_id': document_id}, + update_operations + ) + if update_result.modified_count > 0: + # 构建日志信息 + log_parts = [] + if new_comments: + log_parts.append(f"追加 {len(new_comments)} 条评论") + if interaction_data: + interaction_summary = [] + if 'likes' in interaction_data: + interaction_summary.append(f"点赞={interaction_data.get('likes_formatted', interaction_data['likes'])}") + if 'shares' in interaction_data: + interaction_summary.append(f"分享={interaction_data.get('shares_formatted', interaction_data['shares'])}") + if 'favorites' in interaction_data: + interaction_summary.append(f"收藏={interaction_data.get('favorites_formatted', interaction_data['favorites'])}") + if interaction_summary: + log_parts.append(f"更新互动数据({', '.join(interaction_summary)})") + + logging.info(f'[目标数据库] ✅ 第 {episode_number} 集评论/互动数据更新完成: {mix_name} - {", ".join(log_parts)}') + return True + else: + logging.warning(f'[目标数据库] 第 {episode_number} 集评论/互动数据更新失败: {mix_name}') + return False + except Exception as e: + logging.error(f'[目标数据库] 第 {episode_number} 集评论/互动数据更新失败: {mix_name} - 错误: {e}') + return False else: - logging.warning(f'[实时更新] 更新失败: {mix_name} 第 {episode_number} 集') + logging.error(f'[目标数据库] 目标数据库未初始化') return False except Exception as e: - logging.error(f'[实时更新] 更新失败: {mix_name} 第 {episode_number} 集 - 错误: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'实时更新视频评论,合集: {mix_name}, 第 {episode_number} 集, 文档ID: {document_id}, 新评论数: {len(new_comments) if new_comments else 0}' + } + logging.error(f'[实时更新] 更新失败: {mix_name} 第 {episode_number} 集 - {error_details["error_type"]}: {error_details["error_message"]}') + logging.error(f'详细错误信息: {error_details["traceback"]}') + logging.error(f'错误上下文: {error_details["context"]}') return False def save_single_item_realtime(self, item_data: dict): @@ -1746,11 +2022,48 @@ class DouyinPlayVVScraper: # 第三阶段:逐个获取并更新视频详细数据 if episode_video_ids: - self.update_video_details_incrementally(document_id, episode_video_ids, mix_name) + self.update_video_details_incrementally(document_id, episode_video_ids, mix_name, mix_id) + + # 🔄 第四阶段:触发字段同步到Ranking_storage(如果存在对应的榜单数据) + try: + if mix_name: # 只有当mix_name存在时才尝试同步 + logging.info(f'[字段同步] 检查是否需要同步字段到Ranking_storage: {mix_name}') + + # 导入同步函数(延迟导入避免循环依赖) + import sys + import os + sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'routers')) + from rank_api_routes import sync_ranking_storage_fields + + # 获取今天的日期 + today_str = datetime.now().strftime('%Y-%m-%d') + + # 检查Ranking_storage中是否存在该短剧的今日数据 + ranking_storage_collection = db['Ranking_storage'] + existing_ranking = ranking_storage_collection.find_one({ + "date": today_str, + "mix_name": mix_name + }) + + if existing_ranking: + # 存在对应的榜单数据,触发同步 + logging.info(f'[字段同步] 发现对应的榜单数据,开始同步: {mix_name}') + sync_result = sync_ranking_storage_fields(target_date=today_str, force_update=False) + + if sync_result.get("success", False): + logging.info(f'[字段同步] ✅ 同步成功: {sync_result.get("message", "")}') + else: + logging.info(f'[字段同步] ⚠️ 同步完成: {sync_result.get("message", "")}') + else: + logging.info(f'[字段同步] 未找到对应的榜单数据,跳过同步: {mix_name}') + + except Exception as sync_error: + logging.warning(f'[字段同步] 同步失败,但不影响数据保存: {mix_name} - {sync_error}') + # 同步失败不影响数据保存的成功状态 return True - def update_video_details_incrementally(self, document_id, episode_video_ids: list, mix_name: str): + def update_video_details_incrementally(self, document_id, episode_video_ids: list, mix_name: str, mix_id: str = ''): """增量更新视频详细数据""" logging.info(f'[增量更新] 开始逐个获取视频详细数据: {mix_name}') @@ -1762,7 +2075,7 @@ class DouyinPlayVVScraper: try: # 获取单个视频的详细数据 logging.info(f'[增量更新] 获取第 {i}/{len(episode_video_ids)} 集视频详细数据: {mix_name}') - video_details = self.get_video_details(video_id, mix_name, document_id, i) + video_details = self.get_video_details(video_id, mix_name, mix_id, document_id, i) if video_details and video_details.get('success', False): # 立即更新到数据库 @@ -2002,7 +2315,7 @@ class DouyinPlayVVScraper: return [] def _simulate_comment_scrolling(self, video_id: str, max_scroll_attempts: int = 10, scroll_delay: float = 2.0, - document_id=None, episode_number: int = 0, mix_name: str = '') -> list: + document_id=None, episode_number: int = 0, mix_name: str = '', mix_id: str = '') -> list: """ 模拟用户异步滑动机制,向上滑动加载更多评论 Args: @@ -2012,6 +2325,13 @@ class DouyinPlayVVScraper: Returns: list: 收集到的所有评论数据 """ + # 确保 episode_number 是整数类型 + try: + episode_number = int(episode_number) + except (ValueError, TypeError): + logging.error(f'_simulate_comment_scrolling: episode_number 类型转换失败: {episode_number}, 类型: {type(episode_number)}') + episode_number = 0 + # 检查是否应该跳过评论滑动(仅在定时器模式下跳过) if should_skip_function('scroll_comments'): logging.info(f'🚀 定时器模式:跳过视频 {video_id} 的评论滑动加载') @@ -2049,7 +2369,7 @@ class DouyinPlayVVScraper: # 同时提交监控任务 - 监控任务会检测滑动任务状态(5小时超时) monitor_future = executor.submit(self._async_monitor_task_with_state, video_id, collected_comment_ids, shared_state, 18000, - document_id, episode_number, mix_name) + document_id, episode_number, mix_name, mix_id) # 等待两个任务完成 scroll_result = scroll_future.result() @@ -2059,82 +2379,6 @@ class DouyinPlayVVScraper: logging.info(f'评论滑动加载完成,共收集到 {len(all_comments)} 条评论') - # 针对评论较少的情况,执行补丁机制确保不遗漏评论 - # 当滑动次数较少(可能只滑动了2-3次就到底)但评论数量也较少时, - # 很可能存在页面上可见但未被网络日志捕获的评论 - - # 智能判断是否需要执行补丁机制 - # 只在评论数量真正过少时才启用补丁机制 - should_apply_patch = False - - # 只有当评论数量少于10条时才启用补丁机制 - if len(all_comments) < 10: - should_apply_patch = True - logging.debug(f'评论数量过少({len(all_comments)}条),启用补丁机制') - - # 对于评论数量在10-50条之间的情况,检查是否可能遗漏了评论 - elif len(all_comments) <= 50: - try: - visible_comment_count = self.driver.execute_script(""" - var selectors = [ - '[data-e2e="comment-item"]', - '[class*="comment-item"]', - '[class*="comment-content"]' - ]; - var totalCount = 0; - selectors.forEach(function(selector) { - var elements = document.querySelectorAll(selector); - elements.forEach(function(element) { - if (element.offsetParent !== null && element.textContent.trim().length > 2) { - totalCount++; - } - }); - }); - return totalCount; - """) - - # 只有当页面可见评论数量明显大于已获取数量时才启用补丁 - if visible_comment_count > len(all_comments) * 2: - should_apply_patch = True - logging.debug(f'页面可见评论({visible_comment_count}条) >> 已获取评论({len(all_comments)}条),启用补丁机制') - - except Exception as e: - logging.debug(f'检查页面可见评论数量失败: {e}') - # 检查失败时不启用补丁机制 - - patch_comments = [] - if should_apply_patch: - logging.info('执行评论补丁机制...') - patch_comments = self._extract_comments_patch(video_id) - else: - logging.debug('无需执行补丁机制') - - if patch_comments: - # 去重合并补丁评论 - existing_texts = {comment.get('text', '') for comment in all_comments} - new_patch_comments = [] - - for patch_comment in patch_comments: - if patch_comment.get('text', '') not in existing_texts: - new_patch_comments.append(patch_comment) - existing_texts.add(patch_comment.get('text', '')) - - if new_patch_comments: - all_comments.extend(new_patch_comments) - logging.info(f'补丁机制额外获取到 {len(new_patch_comments)} 条评论,总计 {len(all_comments)} 条评论') - - # 如果有新的评论且启用了实时保存,更新数据库 - if document_id and episode_number and new_patch_comments: - try: - self.update_video_comments_realtime(document_id, episode_number, new_patch_comments, mix_name) - logging.info(f'实时保存补丁评论到数据库: {len(new_patch_comments)} 条') - except Exception as e: - logging.warning(f'实时保存补丁评论失败: {e}') - else: - logging.debug('补丁机制未发现新的评论') - else: - logging.debug('补丁机制未获取到任何评论') - # 保存评论到文件 if all_comments: self.save_comments_to_file(all_comments, video_id) @@ -2147,7 +2391,16 @@ class DouyinPlayVVScraper: return all_comments except Exception as e: - logging.error(f'评论滑动加载机制执行失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'评论滑动加载机制,视频ID: {video_id}, 最大滑动次数: {max_scroll_attempts}' + } + logging.error(f'评论滑动加载机制执行失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.error(f'详细错误信息: {error_details["traceback"]}') + logging.error(f'错误上下文: {error_details["context"]}') return all_comments @@ -2410,14 +2663,30 @@ class DouyinPlayVVScraper: time.sleep(1) except Exception as e: - logging.warning(f'监控任务出错: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'异步监控评论任务,视频ID: {video_id}, 超时时间: {timeout}秒' + } + logging.warning(f'监控任务出错: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') time.sleep(2) return all_comments def _async_monitor_task_with_state(self, video_id: str, collected_comment_ids: set, shared_state: dict, timeout: float, - document_id=None, episode_number: int = 0, mix_name: str = '') -> list: + document_id=None, episode_number: int = 0, mix_name: str = '', mix_id: str = '') -> list: """带状态的异步监控任务 - 监控评论并检测滑动任务状态""" + # 确保 episode_number 是整数类型 + try: + episode_number = int(episode_number) + except (ValueError, TypeError): + logging.error(f'_async_monitor_task_with_state: episode_number 类型转换失败: {episode_number}, 类型: {type(episode_number)}') + episode_number = 0 + all_comments = [] start_time = time.time() last_comment_count = 0 @@ -2445,7 +2714,16 @@ class DouyinPlayVVScraper: all_comments.append(comment) time.sleep(0.5) except Exception as e: - logging.warning(f'最终监控阶段出错: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'最终监控阶段,视频ID: {video_id}, 剩余监控时间: {5 - (time.time() - final_start):.1f}秒' + } + logging.warning(f'最终监控阶段出错: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') break # 从网络日志获取新评论 @@ -2462,7 +2740,7 @@ class DouyinPlayVVScraper: # 实时保存新评论到数据库 if new_comments_to_save and document_id and episode_number > 0: - self.update_video_comments_realtime(document_id, episode_number, new_comments_to_save, mix_name) + self.update_video_comments_realtime(document_id, episode_number, new_comments_to_save, mix_name, mix_id) # 检查是否有新评论 current_comment_count = len(all_comments) @@ -2480,7 +2758,16 @@ class DouyinPlayVVScraper: time.sleep(1) except Exception as e: - logging.warning(f'监控任务出错: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'带状态的异步监控评论任务,视频ID: {video_id}, 超时时间: {timeout}秒, 文档ID: {document_id}, 集数: {episode_number}' + } + logging.warning(f'监控任务出错: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') time.sleep(2) logging.info(f'监控任务结束,共收集到 {len(all_comments)} 条评论') @@ -2585,7 +2872,16 @@ class DouyinPlayVVScraper: logging.debug(f'点击页面中部失败: {e}') except Exception as e: - logging.warning(f'点击评论区域失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'点击评论区域,尝试激活评论加载' + } + logging.warning(f'点击评论区域失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') def _check_comment_section_bottom(self) -> bool: """ @@ -2696,7 +2992,16 @@ class DouyinPlayVVScraper: return False except Exception as e: - logging.warning(f'检测评论区底部失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'检测评论区底部,目标文本: "暂时没有更多评论"' + } + logging.warning(f'检测评论区底部失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') return False def _extract_comments_from_network_logs(self, video_id: str) -> list: @@ -2757,17 +3062,33 @@ class DouyinPlayVVScraper: continue except Exception as e: - logging.warning(f'提取网络日志评论数据失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'提取网络日志评论数据,视频ID: {video_id}, 已处理评论数: {len(comments)}' + } + logging.warning(f'提取网络日志评论数据失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') return comments - def get_video_details(self, video_id: str, mix_name: str = '', document_id=None, episode_number: int = 0) -> dict: + def get_video_details(self, video_id: str, mix_name: str = '', mix_id: str = '', document_id=None, episode_number: int = 0) -> dict: """获取单个视频的详细互动数据 Args: video_id: 视频ID Returns: dict: 包含点赞数、收藏数、转发数、评论内容的字典 """ + # 确保 episode_number 是整数类型 + try: + episode_number = int(episode_number) + except (ValueError, TypeError): + logging.error(f'get_video_details: episode_number 类型转换失败: {episode_number}, 类型: {type(episode_number)}') + episode_number = 0 + video_details = { 'video_id': video_id, 'likes': 0, @@ -2873,7 +3194,7 @@ class DouyinPlayVVScraper: 'favorites': video_details['favorites'], 'favorites_formatted': video_details['favorites_formatted'] } - self.update_video_comments_realtime(document_id, episode_number, None, mix_name, interaction_data) + self.update_video_comments_realtime(document_id, episode_number, None, mix_name, mix_id, interaction_data) interaction_data_saved = True break @@ -2888,7 +3209,7 @@ class DouyinPlayVVScraper: # 启动滑动机制加载更多评论 logging.info(f'开始为视频 {video_id} 启动滑动机制加载评论') scrolled_comments = self._simulate_comment_scrolling(video_id, max_scroll_attempts=15, scroll_delay=2.0, - document_id=document_id, episode_number=episode_number, mix_name=mix_name) + document_id=document_id, episode_number=episode_number, mix_name=mix_name, mix_id=mix_id) # 如果滑动机制获取到评论,直接使用 if scrolled_comments: @@ -2952,8 +3273,17 @@ class DouyinPlayVVScraper: return video_details except Exception as e: - error_msg = f'获取视频 {video_id} 详细数据失败: {e}' + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'获取视频详细数据,视频ID: {video_id}' + } + error_msg = f'获取视频 {video_id} 详细数据失败: {error_details["error_type"]} - {error_details["error_message"]}' logging.error(error_msg) + logging.error(f'详细错误信息: {error_details["traceback"]}') + logging.error(f'错误上下文: {error_details["context"]}') video_details['error'] = error_msg return video_details @@ -3029,7 +3359,7 @@ class DouyinPlayVVScraper: 'favorites': video_details['favorites'], 'favorites_formatted': video_details['favorites_formatted'] } - self.update_video_comments_realtime(document_id, episode_number, None, mix_name, interaction_data) + self.update_video_comments_realtime(document_id, episode_number, None, mix_name, mix_id, interaction_data) interaction_data_saved = True break @@ -3094,11 +3424,20 @@ class DouyinPlayVVScraper: 'favorites': video_details['favorites'], 'favorites_formatted': video_details['favorites_formatted'] } - self.update_video_comments_realtime(document_id, episode_number, None, mix_name, interaction_data) + self.update_video_comments_realtime(document_id, episode_number, None, mix_name, mix_id, interaction_data) interaction_data_saved = True except Exception as e: - logging.warning(f'CSS选择器解析失败: {e}') + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': f'CSS选择器解析视频互动数据,视频ID: {video_id}' + } + logging.warning(f'CSS选择器解析失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.warning(f'详细错误信息: {error_details["traceback"]}') + logging.warning(f'错误上下文: {error_details["context"]}') # 尝试获取评论(如果还没有获取到) if not video_details['comments']: @@ -3146,7 +3485,7 @@ class DouyinPlayVVScraper: return video_details - def get_collection_video_details(self, episode_video_ids: list, mix_name: str = '') -> list: + def get_collection_video_details(self, episode_video_ids: list, mix_name: str = '', mix_id: str = '') -> list: """获取合集中所有视频的详细互动数据 Args: episode_video_ids: 视频ID列表 @@ -3186,7 +3525,7 @@ class DouyinPlayVVScraper: try: # 获取单个视频的详细数据 - video_details = self.get_video_details(video_id) + video_details = self.get_video_details(video_id, mix_name, '', 0, mix_id) video_details['episode_number'] = i video_details_list.append(video_details) @@ -3225,150 +3564,6 @@ class DouyinPlayVVScraper: self.cookies = {cookie['name']: cookie['value'] for cookie in self.driver.get_cookies()} return self.cookies - def _extract_comments_patch(self, video_id: str) -> list: - """ - 评论补丁机制 - 更仔细地重新从网络日志获取评论 - 不再抓取页面元素,而是重新触发评论加载并从API获取 - """ - comments = [] - try: - logging.info(f'启动补丁机制,重新仔细获取视频 {video_id} 的评论...') - - # 首先检查是否存在"抢首评"按钮,如果存在说明视频确实没有评论 - if self._check_first_comment_button(): - logging.info('检测到"抢首评"按钮,确认视频没有评论,跳过补丁机制') - return comments - - # 等待页面稳定 - time.sleep(2) - - # 滚动到评论区域确保评论完全加载 - self._scroll_to_comment_section() - time.sleep(1) - - # 点击评论区域,触发评论加载 - try: - self._click_comment_area() - time.sleep(1) - except: - pass - - # 清理旧的网络日志 - self.driver.get_log('performance') - - # 轻微滚动,触发更多评论加载 - for i in range(3): - self.driver.execute_script("window.scrollBy(0, 200);") - time.sleep(0.5) - self.driver.execute_script("window.scrollBy(0, -100);") - time.sleep(0.5) - - # 等待网络请求完成 - time.sleep(3) - - # 重新从网络日志中提取评论(更仔细的方式) - patch_comments = self._extract_comments_from_network_logs_detailed(video_id) - - if patch_comments: - logging.info(f'补丁机制成功重新获取 {len(patch_comments)} 条评论') - comments.extend(patch_comments) - else: - logging.info('补丁机制未找到额外评论') - - except Exception as e: - logging.error(f'评论补丁机制执行失败: {e}') - - return comments - - def _extract_comments_from_network_logs_detailed(self, video_id: str) -> list: - """ - 更详细地从网络日志中提取评论数据(补丁机制专用) - Args: - video_id: 视频ID - Returns: - list: 评论数据列表 - """ - comments = [] - try: - # 获取网络请求日志 - logs = self.driver.get_log('performance') - - for entry in logs: - try: - log = json.loads(entry['message'])['message'] - if ( - 'Network.responseReceived' in log['method'] - and 'response' in log['params'] - and log['params']['response'] - and log['params']['response'].get('url') - ): - url = log['params']['response']['url'] - - # 检查是否是评论相关的API(更宽泛的匹配) - comment_api_patterns = [ - '/aweme/v1/web/comment/list/', - '/comment/list/', - '/comment/detail/', - '/reply/list/' - ] - - is_comment_api = any(pattern in url for pattern in comment_api_patterns) - - if is_comment_api and video_id in url: - try: - # 获取响应体 - response_body = self.driver.execute_cdp_cmd( - 'Network.getResponseBody', - {'requestId': log['params']['requestId']} - ) - - if response_body and 'body' in response_body: - data = json.loads(response_body['body']) - - # 尝试多种可能的评论数据结构 - api_comments = [] - - # 标准结构 - if 'comments' in data: - api_comments = data['comments'] - # 备用结构 - elif 'comment_list' in data: - api_comments = data['comment_list'] - elif 'data' in data and isinstance(data['data'], list): - api_comments = data['data'] - elif 'data' in data and 'comments' in data['data']: - api_comments = data['data']['comments'] - - for comment in api_comments: - if isinstance(comment, dict): - comment_text = comment.get('text', '') or comment.get('content', '') - if comment_text and len(comment_text.strip()) > 0: - comment_info = { - 'text': comment_text.strip(), - 'user_name': comment.get('user', {}).get('nickname', '') if comment.get('user') else '', - 'digg_count': int(comment.get('digg_count', 0) or comment.get('like_count', 0)), - 'create_time': comment.get('create_time', 0) or comment.get('timestamp', 0), - 'source': 'patch_api' - } - comments.append(comment_info) - - # 记录API URL信息,用于调试 - if api_comments: - logging.debug(f'补丁机制从API获取到 {len(api_comments)} 条评论: {url}') - - except Exception as e: - logging.debug(f'补丁机制处理响应体失败: {e}') - continue - - except Exception as e: - logging.debug(f'补丁机制处理日志条目失败: {e}') - continue - - except Exception as e: - logging.error(f'补丁机制从网络日志提取评论失败: {e}') - - return comments - def _click_comment_area(self): """ 点击评论区域,触发评论加载 @@ -3492,8 +3687,48 @@ class DouyinPlayVVScraper: logging.debug(f'检测抢首评按钮时出错: {e}') return False + def cleanup_old_management_data(self, days_to_keep: int = 7): + """清理目标数据库中的旧数据,基于last_updated字段保留指定天数的数据""" + target_collection = self.collection # 使用根据模式选择的集合 + if target_collection is None: + logging.warning('[数据清理] 目标集合未初始化,跳过清理') + return False + + try: + # 计算需要保留的最早时间 + from datetime import timedelta + cutoff_datetime = datetime.now() - timedelta(days=days_to_keep) + + # 查询需要删除的数据数量(基于last_updated字段) + old_data_count = target_collection.count_documents({ + 'last_updated': {'$lt': cutoff_datetime} + }) + + if old_data_count == 0: + logging.info(f'[数据清理] 无需清理,没有超过{days_to_keep}天未更新的旧数据') + return True + + # 删除旧数据 + delete_result = target_collection.delete_many({ + 'last_updated': {'$lt': cutoff_datetime} + }) + + if delete_result.deleted_count > 0: + logging.info(f'[数据清理] ✅ 成功清理Rankings_management中{delete_result.deleted_count}条旧数据(保留最近{days_to_keep}天更新的数据)') + return True + else: + logging.warning(f'[数据清理] 清理操作未删除任何数据') + return False + + except Exception as e: + logging.error(f'[数据清理] 清理Rankings_management旧数据失败: {e}') + return False + def run(self): try: + # 在开始抓取前清理旧数据(保留最近7天) + self.cleanup_old_management_data(days_to_keep=7) + self.setup_driver() self.navigate() self.ensure_login() @@ -3503,6 +3738,18 @@ class DouyinPlayVVScraper: self.dedupe() self.save_results() logging.info('完成,play_vv数量: %d', len(self.play_vv_items)) + except Exception as e: + import traceback + error_details = { + 'error_type': type(e).__name__, + 'error_message': str(e), + 'traceback': traceback.format_exc(), + 'context': '执行抖音播放量抓取任务主流程' + } + logging.error(f'抓取任务执行失败: {error_details["error_type"]} - {error_details["error_message"]}') + logging.error(f'详细错误信息: {error_details["traceback"]}') + logging.error(f'错误上下文: {error_details["context"]}') + raise # 重新抛出异常,让上层调用者处理 finally: if self.driver: try: diff --git a/backend/routers/rank_api_routes.py b/backend/routers/rank_api_routes.py index ae62d27..10dc818 100644 --- a/backend/routers/rank_api_routes.py +++ b/backend/routers/rank_api_routes.py @@ -15,7 +15,8 @@ from database import db rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank') # 获取数据库集合 -collection = db['Rankings_list'] +collection = db['Ranking_storage'] # 主要数据源:榜单存储表(包含data数组) +rankings_management_collection = db['Rankings_management'] # 管理数据库(字段同步源) daily_rankings_collection = db['Ranking_storage'] # 榜单存储表 def format_playcount(playcount_str): @@ -117,12 +118,72 @@ def sort_ranking_data(ranking_data, sort_by, sort_order='desc'): # 如果排序失败,返回原始数据 return ranking_data -def format_mix_item(doc): + + +def parse_formatted_count(formatted_str): + """解析格式化的数字字符串(如"1.2万"、"374W"等)""" + try: + if not formatted_str or formatted_str == "0": + return 0 + + formatted_str = str(formatted_str).strip() + + # 处理万、W等单位 + if "万" in formatted_str or "W" in formatted_str: + # 提取数字部分 + import re + numbers = re.findall(r'[\d.]+', formatted_str) + if numbers: + num = float(numbers[0]) + return int(num * 10000) + elif "亿" in formatted_str: + numbers = re.findall(r'[\d.]+', formatted_str) + if numbers: + num = float(numbers[0]) + return int(num * 100000000) + else: + # 尝试直接转换为数字 + return int(float(formatted_str)) + except: + return 0 + + return 0 + +def format_interaction_count(count): + """格式化互动数量为易读格式""" + try: + count = int(count) + if count >= 100000000: # 1亿+ + return f"{count / 100000000:.1f}亿" + elif count >= 10000: # 1万+ + return f"{count / 10000:.1f}万" + else: + return str(count) + except: + return "0" + +def format_mix_item(doc, target_date=None): """格式化合集数据项 - 完全按照数据库原始字段返回""" + mix_name = doc.get("mix_name", "") + + # 计算总点赞数 + episode_details = doc.get("episode_details", []) + total_likes = 0 + total_comments = 0 + + if episode_details: + for episode in episode_details: + total_likes += episode.get("likes", 0) + total_comments += len(episode.get("comments", [])) + + # 格式化总点赞数 + total_likes_formatted = format_interaction_count(total_likes) + total_comments_formatted = format_interaction_count(total_comments) + return { "_id": str(doc.get("_id", "")), "batch_time": format_time(doc.get("batch_time")), - "mix_name": doc.get("mix_name", ""), + "mix_name": mix_name, "video_url": doc.get("video_url", ""), "playcount": doc.get("playcount", ""), "play_vv": doc.get("play_vv", 0), @@ -131,16 +192,26 @@ def format_mix_item(doc): "cover_image_url": doc.get("cover_image_url", ""), # 新增字段 "series_author": doc.get("series_author", ""), + "Manufacturing_Field": doc.get("Manufacturing_Field", ""), + "Copyright_field": doc.get("Copyright_field", ""), "desc": doc.get("desc", ""), "updated_to_episode": doc.get("updated_to_episode", 0), "cover_backup_urls": doc.get("cover_backup_urls", []), "mix_id": doc.get("mix_id", ""), "episode_video_ids": doc.get("episode_video_ids", []), - "episode_details": doc.get("episode_details", []) + "episode_details": doc.get("episode_details", []), + # 点赞和评论总数 + "total_likes": total_likes, + "total_likes_formatted": total_likes_formatted, + "total_comments": total_comments, + "total_comments_formatted": total_comments_formatted, + # 播放量变化数据 + "timeline_data": doc.get("timeline_data", []), + } -def get_mix_list(page=1, limit=20, sort_by="playcount"): - """获取合集列表(分页)""" +def get_mix_list(page=1, limit=20, sort_by="playcount", classification_type=None): + """获取合集列表(分页)- 从Ranking_storage的data数组中获取数据,支持分类筛选""" try: # 计算跳过的数量 skip = (page - 1) * limit @@ -150,55 +221,85 @@ def get_mix_list(page=1, limit=20, sort_by="playcount"): # 按增长排序需要特殊处理 return get_growth_mixes(page, limit) else: - sort_field = "play_vv" if sort_by == "playcount" else "batch_time" - sort_order = -1 # 降序 - # 获取今天的日期 today = datetime.now().date() + today_str = today.strftime("%Y-%m-%d") - # 只查询今天的数据 - query_condition = { - "batch_time": { - "$gte": datetime(today.year, today.month, today.day), - "$lt": datetime(today.year, today.month, today.day) + timedelta(days=1) + # 从Ranking_storage中获取今天的数据 + ranking_doc = collection.find_one({ + "date": today_str, + "type": {"$in": ["comprehensive", "playcount"]} # 查找包含播放量数据的榜单 + }, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果 + + if not ranking_doc or "data" not in ranking_doc: + # 如果没有找到今天的数据,返回空结果 + logging.warning(f"Ranking_storage中未找到 {today_str} 的数据") + return { + "success": True, + "message": f"暂无 {today_str} 的数据,请等待定时任务生成", + "data": [], + "pagination": { + "page": page, + "limit": limit, + "total": 0, + "pages": 0, + "has_next": False, + "has_prev": False + }, + "sort_by": sort_by, + "data_source": "ranking_storage", + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } - } - # 查询数据并按短剧名称分组,取每个短剧的最新记录 - pipeline = [ - {"$match": query_condition}, - {"$sort": {"batch_time": -1}}, # 按时间倒序 - {"$group": { - "_id": "$mix_name", # 按短剧名称分组 - "latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录) - }}, - {"$replaceRoot": {"newRoot": "$latest_doc"}}, - {"$sort": {sort_field: sort_order}}, - {"$skip": skip}, - {"$limit": limit} - ] + # 获取data数组中的数据 + mix_data = ranking_doc.get("data", []) + + # 分类筛选逻辑 + if classification_type: + filtered_data = [] + classification_field_map = { + 'novel': 'Novel_IDs', + 'anime': 'Anime_IDs', + 'drama': 'Drama_IDs' + } + + if classification_type in classification_field_map: + field_name = classification_field_map[classification_type] + + for item in mix_data: + mix_id = item.get('mix_id') + if mix_id: + # 检查该mix_id是否在对应的分类字段中 + classification_ids = item.get(field_name, []) + if isinstance(classification_ids, list) and mix_id in classification_ids: + filtered_data.append(item) + + mix_data = filtered_data + logging.info(f"分类筛选 {classification_type}: 筛选出 {len(mix_data)} 条数据") + + # 按播放量排序(如果需要) + if sort_by == "playcount": + mix_data = sorted(mix_data, key=lambda x: x.get("play_vv", 0), reverse=True) - docs = list(collection.aggregate(pipeline)) + # 分页处理 + total = len(mix_data) + paginated_data = mix_data[skip:skip + limit] - # 获取总数 - total_pipeline = [ - {"$match": query_condition}, - {"$sort": {"batch_time": -1}}, - {"$group": {"_id": "$mix_name"}}, - {"$count": "total"} - ] - total_result = list(collection.aggregate(total_pipeline)) - total = total_result[0]["total"] if total_result else 0 - - # 格式化数据 - mix_list = [] - for doc in docs: - item = format_mix_item(doc) - mix_list.append(item) + # 为分页数据添加排名并格式化 + formatted_data = [] + for i, item in enumerate(paginated_data): + item["rank"] = skip + i + 1 + # 确保mix_name字段存在 + if "mix_name" not in item and "title" in item: + item["mix_name"] = item["title"] + + # 使用format_mix_item函数格式化数据,包括计算总点赞数 + formatted_item = format_mix_item(item) + formatted_data.append(formatted_item) return { "success": True, - "data": mix_list, + "data": formatted_data, "pagination": { "page": page, "limit": limit, @@ -208,48 +309,80 @@ def get_mix_list(page=1, limit=20, sort_by="playcount"): "has_prev": page > 1 }, "sort_by": sort_by, - "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "data_source": "ranking_storage", + "update_time": ranking_doc.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(ranking_doc.get("created_at"), datetime) else str(ranking_doc.get("created_at", "")) } except Exception as e: logging.error(f"获取合集列表失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} -def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): - """获取按播放量增长排序的合集列表 - 仅从Ranking_storage读取预计算数据""" +def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classification_type=None): + """获取按播放量增长排序的合集列表 - 直接从Ranking_storage读取对应日期的数据""" try: # 计算跳过的数量 skip = (page - 1) * limit - # 如果没有提供日期,默认使用今天和昨天 - if not start_date or not end_date: - end_date = datetime.now().date() - start_date = end_date - timedelta(days=1) - else: - # 转换字符串日期为datetime对象 - if isinstance(start_date, str): - start_date = datetime.strptime(start_date, "%Y-%m-%d").date() + # 简化日期处理:直接使用前端传来的日期 + if start_date and end_date: + # 如果前端提供了日期,直接使用(优先使用end_date作为查询日期) if isinstance(end_date, str): - end_date = datetime.strptime(end_date, "%Y-%m-%d").date() + target_date = end_date + else: + target_date = end_date.strftime("%Y-%m-%d") + elif end_date: + # 如果只提供了end_date,使用end_date + if isinstance(end_date, str): + target_date = end_date + else: + target_date = end_date.strftime("%Y-%m-%d") + elif start_date: + # 如果只提供了start_date,使用start_date + if isinstance(start_date, str): + target_date = start_date + else: + target_date = start_date.strftime("%Y-%m-%d") + else: + # 如果没有提供日期,默认使用今天 + target_date = datetime.now().date().strftime("%Y-%m-%d") - end_date_str = end_date.strftime("%Y-%m-%d") - start_date_str = start_date.strftime("%Y-%m-%d") + logging.info(f"📅 查询日期: {target_date}") + + # 检查并自动同步Ranking_storage字段信息 + # 检查是否需要同步字段信息 + sample_item = daily_rankings_collection.find_one({ + "date": target_date, + "mix_name": {"$exists": True} + }) + + if sample_item: + # 检查是否缺少关键字段 + missing_manufacturing = sample_item.get('Manufacturing_Field') is None + missing_copyright = sample_item.get('Copyright_field') is None + + if missing_manufacturing or missing_copyright: + logging.info(f"检测到 {target_date} 的Ranking_storage数据缺少字段信息,开始自动同步...") + sync_result = sync_ranking_storage_fields(target_date, force_update=False) + if sync_result["success"]: + logging.info(f"自动同步完成: {sync_result['stats']}") + else: + logging.warning(f"自动同步失败: {sync_result['message']}") # 从Ranking_storage读取预计算的增长榜数据 growth_ranking = daily_rankings_collection.find_one({ - "date": end_date_str, + "date": target_date, "type": "comprehensive" # 使用comprehensive类型,包含增长数据 }, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果 if not growth_ranking or "data" not in growth_ranking: # 如果没有找到comprehensive类型,尝试查找growth类型 growth_ranking = daily_rankings_collection.find_one({ - "date": end_date_str, + "date": target_date, "type": "growth" }, sort=[("calculation_sequence", -1)]) if growth_ranking and "data" in growth_ranking: - logging.info(f"📈 从Ranking_storage读取 {end_date_str} 的增长榜数据") + logging.info(f"📈 从Ranking_storage读取 {target_date} 的增长榜数据") # 获取预先计算好的增长榜数据 growth_data = growth_ranking["data"] @@ -261,17 +394,117 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): key=lambda x: x.get("timeline_data", {}).get("play_vv_change", 0), reverse=True) + # 根据分类类型筛选数据 + if classification_type: + classification_field_map = { + "novel": "Novel_IDs", + "anime": "Anime_IDs", + "drama": "Drama_IDs" + } + + if classification_type in classification_field_map: + field_name = classification_field_map[classification_type] + filtered_data = [] + + for item in growth_data: + mix_id = item.get("mix_id", "") + if mix_id: + # 查找对应的Rankings_management记录获取分类信息 + management_item = rankings_management_collection.find_one({"mix_id": mix_id}) + if management_item: + classification_ids = management_item.get(field_name, []) + if isinstance(classification_ids, list) and mix_id in classification_ids: + filtered_data.append(item) + + growth_data = filtered_data + # 分页处理 total = len(growth_data) paginated_data = growth_data[skip:skip + limit] - # 为分页数据添加排名 + # 为分页数据添加排名和补充完整字段信息 for i, item in enumerate(paginated_data): item["rank"] = skip + i + 1 + # 修复:使用mix_name字段,不要用空的title覆盖它 + mix_name = item.get("mix_name", "") + + if mix_name: + + # 优化:直接从Ranking_storage中获取已同步的字段信息 + # 查找对应日期的Ranking_storage记录 + ranking_storage_item = daily_rankings_collection.find_one({ + "date": target_date, + "mix_name": mix_name + }) + + if ranking_storage_item: + # 直接使用Ranking_storage中已同步的字段 + item.update({ + "Manufacturing_Field": ranking_storage_item.get("Manufacturing_Field", ""), + "Copyright_field": ranking_storage_item.get("Copyright_field", ""), + "series_author": ranking_storage_item.get("series_author", item.get("series_author", "")), + "video_id": ranking_storage_item.get("video_id", item.get("video_id", "")), + "video_url": ranking_storage_item.get("video_url", item.get("video_url", "")), + # 保持当前item中的封面和播放量数据(来自榜单计算) + "cover_image_url": item.get("cover_image_url", ranking_storage_item.get("cover_image_url", "")), + "play_vv": item.get("play_vv", ranking_storage_item.get("play_vv", 0)), + "playcount_str": item.get("playcount_str", ranking_storage_item.get("playcount_str", "0")) + }) + logging.info(f"从Ranking_storage获取到同步字段: {mix_name}") + else: + # 如果Ranking_storage中没有对应记录,回退到原有逻辑 + logging.warning(f"Ranking_storage中未找到 {mix_name} 的记录,回退到原有查询逻辑") + + # 根据查询日期判断数据源 + today = datetime.now().date() + # 将target_date字符串转换为日期对象进行比较 + try: + target_date_obj = datetime.strptime(target_date, "%Y-%m-%d").date() + is_historical_date = target_date_obj < today + except: + is_historical_date = False + + management_doc = None + + # 统一从Rankings_management获取数据 + management_doc = rankings_management_collection.find_one({"mix_name": mix_name}) + + if management_doc: + item.update({ + "Manufacturing_Field": management_doc.get("Manufacturing_Field", ""), + "Copyright_field": management_doc.get("Copyright_field", ""), + "series_author": management_doc.get("series_author", item.get("series_author", "")), + "video_id": management_doc.get("video_id", item.get("video_id", "")), + "video_url": management_doc.get("video_url", item.get("video_url", "")), + "cover_image_url": item.get("cover_image_url", management_doc.get("cover_image_url", "")), + "play_vv": item.get("play_vv", management_doc.get("play_vv", 0)), + "playcount_str": item.get("playcount_str", management_doc.get("playcount_str", "0")) + }) + else: + # 设置默认值 + item.update({ + "Manufacturing_Field": "", + "Copyright_field": "", + "series_author": item.get("series_author", ""), + "video_id": item.get("video_id", ""), + "video_url": item.get("video_url", ""), + "cover_image_url": item.get("cover_image_url", ""), + "play_vv": item.get("play_vv", 0), + "playcount_str": item.get("playcount_str", "0") + }) + else: + item["Manufacturing_Field"] = "" + item["Copyright_field"] = "" + + # 使用format_mix_item函数格式化所有数据,包括计算总点赞数 + formatted_data = [] + for item in paginated_data: + formatted_item = format_mix_item(item) + formatted_data.append(formatted_item) return { "success": True, - "data": paginated_data, + "data": formatted_data, "pagination": { "page": page, "limit": limit, @@ -282,18 +515,18 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): }, "sort_by": "growth", "date_range": { - "start_date": start_date_str, - "end_date": end_date_str + "start_date": target_date, + "end_date": target_date }, "data_source": "ranking_storage", # 标识数据来源 "update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", "")) } else: # 如果Ranking_storage中没有数据,返回空结果 - logging.warning(f"Ranking_storage中未找到 {end_date_str} 的增长榜数据") + logging.warning(f"Ranking_storage中未找到 {target_date} 的增长榜数据") return { "success": True, - "message": f"暂无 {end_date_str} 的增长榜数据,请等待定时任务生成", + "message": f"暂无 {target_date} 的增长榜数据,请等待定时任务生成", "data": [], "pagination": { "page": page, @@ -305,8 +538,8 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): }, "sort_by": "growth", "date_range": { - "start_date": start_date_str, - "end_date": end_date_str + "start_date": target_date, + "end_date": target_date }, "data_source": "ranking_storage", "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -508,19 +741,32 @@ def get_statistics(): return {"success": False, "message": f"获取统计失败: {str(e)}"} # 路由定义 +@rank_bp.route('/growth_mixes') +def get_growth_mixes_route(): + """获取增长榜合集列表""" + page = int(request.args.get('page', 1)) + limit = int(request.args.get('limit', 20)) + start_date = request.args.get('start_date') + end_date = request.args.get('end_date') + classification_type = request.args.get('classification_type') + + result = get_growth_mixes(page, limit, start_date, end_date, classification_type) + return jsonify(result) + @rank_bp.route('/videos') def get_videos(): - """获取合集列表 - 兼容app.py调用""" + """获取合集列表 - 兼容app.py调用,支持分类筛选""" page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 20)) sort_by = request.args.get('sort', 'playcount') + classification_type = request.args.get('classification_type') # 新增分类筛选参数 if sort_by == 'growth': start_date = request.args.get('start_date') end_date = request.args.get('end_date') - result = get_growth_mixes(page, limit, start_date, end_date) + result = get_growth_mixes(page, limit, start_date, end_date, classification_type) else: - result = get_mix_list(page, limit, sort_by) + result = get_mix_list(page, limit, sort_by, classification_type) return jsonify(result) @@ -822,4 +1068,744 @@ def get_rankings_stats(): except Exception as e: logging.error(f"获取榜单统计失败: {e}") - return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"}) \ No newline at end of file + return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"}) + + +@rank_bp.route('/update_drama_info', methods=['POST']) +def update_drama_info(): + """更新短剧信息(支持双向同步)""" + try: + data = request.get_json() + + # 验证必需参数 + if not data or 'mix_name' not in data: + return jsonify({"success": False, "message": "缺少必需参数 mix_name"}) + + mix_name = data['mix_name'] + target_date = data.get('target_date') # 可选参数,用于判断是否为今日数据 + + # 准备更新字段 + update_fields = {} + + # 检查并添加需要更新的字段 + if 'title' in data: + update_fields['title'] = data['title'] + if 'series_author' in data: + update_fields['series_author'] = data['series_author'] + if 'Manufacturing_Field' in data: + update_fields['Manufacturing_Field'] = data['Manufacturing_Field'] + if 'Copyright_field' in data: + update_fields['Copyright_field'] = data['Copyright_field'] + if 'desc' in data: + update_fields['desc'] = data['desc'] + if 'play_vv' in data: + update_fields['play_vv'] = data['play_vv'] + + if 'cover_image_url' in data: + update_fields['cover_image_url'] = data['cover_image_url'] + if 'cover_backup_urls' in data: + update_fields['cover_backup_urls'] = data['cover_backup_urls'] + if 'timeline_data' in data: + update_fields['timeline_data'] = data['timeline_data'] + + if not update_fields: + return jsonify({"success": False, "message": "没有提供需要更新的字段"}) + + # 获取今天的日期 + today = datetime.now().date().strftime('%Y-%m-%d') + is_today_data = target_date == today if target_date else True + + updated_count = 0 + + # 首先检查短剧是否存在 + existing_drama = rankings_management_collection.find_one({"mix_name": mix_name}) + if not existing_drama: + return jsonify({ + "success": False, + "message": f"未找到短剧: {mix_name}" + }) + + # 1. 更新Rankings_management数据库 + result_mgmt = rankings_management_collection.update_many( + {"mix_name": mix_name}, + {"$set": update_fields} + ) + + # 2. 更新Ranking_storage数据库中的data数组 + result_storage = collection.update_many( + {"data.mix_name": mix_name}, + {"$set": {f"data.$.{field}": value for field, value in update_fields.items()}} + ) + + updated_count = result_mgmt.modified_count + result_storage.modified_count + matched_count = result_mgmt.matched_count + result_storage.matched_count + + logging.info(f"数据更新: Rankings_management(匹配:{result_mgmt.matched_count}, 修改:{result_mgmt.modified_count}), Ranking_storage(匹配:{result_storage.matched_count}, 修改:{result_storage.modified_count})") + + # 只要找到了数据就算成功,不管是否有修改 + if matched_count > 0: + message = f"成功处理短剧 {mix_name} 的信息" + if updated_count > 0: + message += f",已更新 {updated_count} 条记录" + else: + message += ",数据无变化" + + return jsonify({ + "success": True, + "message": message, + "data": { + "mix_name": mix_name, + "updated_fields": list(update_fields.keys()), + "updated_count": updated_count, + "matched_count": matched_count, + "is_today_data": is_today_data + } + }) + else: + return jsonify({ + "success": False, + "message": f"未找到短剧 {mix_name} 的相关数据" + }) + + except Exception as e: + logging.error(f"更新短剧信息失败: {e}") + return jsonify({"success": False, "message": f"更新短剧信息失败: {str(e)}"}) + + +@rank_bp.route('/update_content_classification', methods=['POST']) +def update_content_classification(): + """更新内容分类(支持将短剧ID添加到对应分类字段中)""" + try: + data = request.get_json() + + # 验证必需参数 + if not data or 'mix_name' not in data or 'classification_type' not in data: + return jsonify({"success": False, "message": "缺少必需参数 mix_name 或 classification_type"}) + + mix_name = data['mix_name'] + classification_type = data['classification_type'] # 'novel', 'anime', 'drama' + action = data.get('action', 'add') # 'add' 或 'remove' + exclusive = data.get('exclusive', True) # 默认启用互斥模式,确保每个短剧只能属于一个分类 + + # 验证分类类型 + valid_types = ['novel', 'anime', 'drama'] + if classification_type not in valid_types: + return jsonify({"success": False, "message": f"无效的分类类型,支持的类型: {valid_types}"}) + + # 映射分类类型到字段名 + field_mapping = { + 'novel': 'Novel_IDs', + 'anime': 'Anime_IDs', + 'drama': 'Drama_IDs' + } + field_name = field_mapping[classification_type] + + # 首先从Rankings_management获取短剧的mix_id + mgmt_doc = rankings_management_collection.find_one({"mix_name": mix_name}) + if not mgmt_doc: + return jsonify({"success": False, "message": f"未找到短剧: {mix_name}"}) + + mix_id = mgmt_doc.get('mix_id') + if not mix_id: + return jsonify({"success": False, "message": f"短剧 {mix_name} 缺少 mix_id"}) + + updated_count = 0 + + # 根据操作类型更新数据 + if action == 'add': + # 如果启用互斥模式,先移除其他分类 + if exclusive: + # 获取其他分类字段名 + other_fields = [f for f in field_mapping.values() if f != field_name] + + # 记录移除操作的结果 + removed_from_other_categories = [] + + # 1. 从Rankings_management中移除其他分类 + for other_field in other_fields: + result = rankings_management_collection.update_many( + {"mix_name": mix_name, other_field: mix_id}, + {"$pull": {other_field: mix_id}} + ) + if result.modified_count > 0: + # 找到对应的分类名称 + for cat_type, field in field_mapping.items(): + if field == other_field: + removed_from_other_categories.append(cat_type) + break + + # 2. 从Ranking_storage中移除其他分类 + for other_field in other_fields: + collection.update_many( + {"data.mix_name": mix_name}, + {"$pull": {f"data.$.{other_field}": mix_id}} + ) + + if removed_from_other_categories: + logging.info(f"互斥模式:已将短剧 {mix_name} 从 {', '.join(removed_from_other_categories)} 分类中移除") + else: + logging.info(f"互斥模式:短剧 {mix_name} 未在其他分类中,无需移除") + + # 添加到分类字段(使用$addToSet避免重复) + # 1. 更新Rankings_management数据库 + result_mgmt = rankings_management_collection.update_many( + {"mix_name": mix_name}, + {"$addToSet": {field_name: mix_id}} + ) + + # 2. 更新Ranking_storage数据库中的data数组 + result_storage = collection.update_many( + {"data.mix_name": mix_name}, + {"$addToSet": {f"data.$.{field_name}": mix_id}} + ) + + updated_count = result_mgmt.modified_count + result_storage.modified_count + message = f"成功将短剧 {mix_name} 添加到 {classification_type} 分类" + if exclusive and removed_from_other_categories: + message += f"(已自动从 {', '.join(removed_from_other_categories)} 分类中移除)" + + elif action == 'remove': + # 从分类字段中移除 + # 1. 更新Rankings_management数据库 + result_mgmt = rankings_management_collection.update_many( + {"mix_name": mix_name}, + {"$pull": {field_name: mix_id}} + ) + + # 2. 更新Ranking_storage数据库中的data数组 + result_storage = collection.update_many( + {"data.mix_name": mix_name}, + {"$pull": {f"data.$.{field_name}": mix_id}} + ) + + updated_count = result_mgmt.modified_count + result_storage.modified_count + message = f"成功将短剧 {mix_name} 从 {classification_type} 分类中移除" + + else: + return jsonify({"success": False, "message": "无效的操作类型,支持 'add' 或 'remove'"}) + + logging.info(f"分类更新: {message}, Rankings_management({result_mgmt.modified_count}), Ranking_storage({result_storage.modified_count})") + + # 获取更新后的分类状态 + updated_mgmt_doc = rankings_management_collection.find_one({"mix_name": mix_name}) + classification_status = { + 'novel': mix_id in updated_mgmt_doc.get('Novel_IDs', []) if updated_mgmt_doc else False, + 'anime': mix_id in updated_mgmt_doc.get('Anime_IDs', []) if updated_mgmt_doc else False, + 'drama': mix_id in updated_mgmt_doc.get('Drama_IDs', []) if updated_mgmt_doc else False + } + + return jsonify({ + "success": True, + "message": message, + "data": { + "mix_name": mix_name, + "mix_id": mix_id, + "classification_type": classification_type, + "field_name": field_name, + "action": action, + "updated_count": updated_count, + "classification_status": classification_status + } + }) + + except Exception as e: + logging.error(f"更新内容分类失败: {e}") + return jsonify({"success": False, "message": f"更新内容分类失败: {str(e)}"}) + + +@rank_bp.route('/get_content_classification', methods=['GET']) +def get_content_classification(): + """获取短剧的分类状态""" + try: + mix_name = request.args.get('mix_name') + + if not mix_name: + return jsonify({"success": False, "message": "缺少必需参数 mix_name"}) + + # 从Rankings_management获取短剧信息 + mgmt_doc = rankings_management_collection.find_one({"mix_name": mix_name}) + if not mgmt_doc: + return jsonify({"success": False, "message": f"未找到短剧: {mix_name}"}) + + mix_id = mgmt_doc.get('mix_id') + if not mix_id: + return jsonify({"success": False, "message": f"短剧 {mix_name} 缺少 mix_id"}) + + # 检查短剧在各个分类中的状态 + novel_ids = mgmt_doc.get('Novel_IDs', []) + anime_ids = mgmt_doc.get('Anime_IDs', []) + drama_ids = mgmt_doc.get('Drama_IDs', []) + + classification_status = { + 'novel': mix_id in novel_ids, + 'anime': mix_id in anime_ids, + 'drama': mix_id in drama_ids + } + + return jsonify({ + "success": True, + "message": f"获取短剧 {mix_name} 分类状态成功", + "data": { + "mix_name": mix_name, + "mix_id": mix_id, + "classification_status": classification_status, + "classification_details": { + "Novel_IDs": novel_ids, + "Anime_IDs": anime_ids, + "Drama_IDs": drama_ids + } + } + }) + + except Exception as e: + logging.error(f"获取内容分类状态失败: {e}") + return jsonify({"success": False, "message": f"获取内容分类状态失败: {str(e)}"}) + + +def validate_and_fix_classification_exclusivity(): + """ + 验证和修复数据库中的分类互斥性 + 确保每个短剧只属于一个分类(Novel_IDs、Anime_IDs、Drama_IDs) + + Returns: + dict: 修复结果统计 + """ + try: + # 获取所有Rankings_management数据 + all_docs = list(rankings_management_collection.find({})) + + fixed_count = 0 + conflict_count = 0 + + for doc in all_docs: + mix_name = doc.get('mix_name', '') + mix_id = doc.get('mix_id') + + if not mix_id: + continue + + # 检查分类字段 + novel_ids = doc.get('Novel_IDs', []) + anime_ids = doc.get('Anime_IDs', []) + drama_ids = doc.get('Drama_IDs', []) + + # 统计该mix_id在多少个分类中出现 + classifications = [] + if mix_id in novel_ids: + classifications.append('novel') + if mix_id in anime_ids: + classifications.append('anime') + if mix_id in drama_ids: + classifications.append('drama') + + # 如果出现在多个分类中,需要修复 + if len(classifications) > 1: + conflict_count += 1 + logging.warning(f"发现分类冲突: {mix_name} 同时属于 {classifications}") + + # 保留最后一个分类,移除其他分类 + # 优先级:drama > anime > novel + if 'drama' in classifications: + keep_classification = 'drama' + elif 'anime' in classifications: + keep_classification = 'anime' + else: + keep_classification = 'novel' + + # 更新数据库 + update_fields = {} + if keep_classification == 'novel': + update_fields['Novel_IDs'] = novel_ids + update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id] + update_fields['Drama_IDs'] = [id for id in drama_ids if id != mix_id] + elif keep_classification == 'anime': + update_fields['Novel_IDs'] = [id for id in novel_ids if id != mix_id] + update_fields['Anime_IDs'] = anime_ids + update_fields['Drama_IDs'] = [id for id in drama_ids if id != mix_id] + elif keep_classification == 'drama': + update_fields['Novel_IDs'] = [id for id in novel_ids if id != mix_id] + update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id] + update_fields['Drama_IDs'] = drama_ids + + # 更新Rankings_management + rankings_management_collection.update_one( + {"mix_name": mix_name}, + {"$set": update_fields} + ) + + # 更新Ranking_storage + collection.update_many( + {"data.mix_name": mix_name}, + {"$set": { + f"data.$.Novel_IDs": update_fields['Novel_IDs'], + f"data.$.Anime_IDs": update_fields['Anime_IDs'], + f"data.$.Drama_IDs": update_fields['Drama_IDs'] + }} + ) + + fixed_count += 1 + logging.info(f"修复分类冲突: {mix_name} 保留为 {keep_classification} 分类") + + return { + "success": True, + "message": f"分类互斥性验证完成", + "data": { + "total_checked": len(all_docs), + "conflicts_found": conflict_count, + "conflicts_fixed": fixed_count + } + } + + except Exception as e: + logging.error(f"验证分类互斥性失败: {e}") + return { + "success": False, + "message": f"验证分类互斥性失败: {str(e)}" + } + + +def sync_ranking_storage_fields(target_date=None, force_update=False, max_retries=3, retry_delay=60): + """ + 同步Ranking_storage中的字段信息 + 统一从Rankings_management中获取对应的字段值并保存到Ranking_storage + + Args: + target_date: 目标日期,格式为'YYYY-MM-DD',默认为今天 + force_update: 是否强制更新已有字段,默认False + max_retries: 最大重试次数,默认3次 + retry_delay: 重试间隔(秒),默认60秒 + + Returns: + dict: 同步结果统计 + """ + try: + # 设置目标日期 + if target_date is None: + target_date_obj = datetime.now().date() + target_date = target_date_obj.strftime('%Y-%m-%d') + else: + target_date_obj = datetime.strptime(target_date, '%Y-%m-%d').date() + + # 获取Ranking_storage中指定日期的数据 + ranking_storage_query = {"date": target_date} + ranking_storage_items = list(daily_rankings_collection.find(ranking_storage_query)) + + if not ranking_storage_items: + return { + "success": False, + "message": f"未找到日期 {target_date} 的Ranking_storage数据" + } + + # 统计信息 + total_items = len(ranking_storage_items) + updated_items = 0 + skipped_items = 0 + error_items = 0 + retry_count = 0 # 重试次数计数器 + pending_items = [] # 需要重试的项目 + + # 🔄 修复后的同步逻辑:更新data数组中的每个项目 + for ranking_doc in ranking_storage_items: + try: + # 获取data数组 + data_array = ranking_doc.get('data', []) + if not data_array: + logging.warning(f"Ranking_storage文档没有data数组: {ranking_doc.get('_id')}") + skipped_items += 1 + continue + + # 标记是否有任何项目被更新 + doc_updated = False + updated_data_array = [] + + # 遍历data数组中的每个项目 + for data_item in data_array: + try: + mix_name = data_item.get('mix_name') + + # 🔧 增强逻辑:如果mix_name为空,尝试通过其他方式找到对应数据 + source_data = None + if mix_name: + # 优先使用mix_name查找 - 从Rankings_management获取数据 + source_data = rankings_management_collection.find_one({"mix_name": mix_name}) + + # 如果通过mix_name没找到数据,或者mix_name为空,尝试其他匹配方式 + if not source_data: + # 方法1:通过mix_id匹配(如果有的话) + mix_id = data_item.get('mix_id') + if mix_id: + source_data = rankings_management_collection.find_one({"mix_id": mix_id}) + if source_data: + logging.info(f"通过mix_id找到数据: {mix_id} -> {source_data.get('mix_name', 'N/A')}") + + # 方法2:如果还是没找到,尝试通过title匹配 + if not source_data: + title = data_item.get('title') + if title: + source_data = rankings_management_collection.find_one({"mix_name": title}) + if source_data: + logging.info(f"通过title找到数据: {title} -> {source_data.get('mix_name', 'N/A')}") + + # 如果找到了源数据,更新mix_name(如果原来为空的话) + if source_data and not mix_name: + mix_name = source_data.get('mix_name', '') + data_item['mix_name'] = mix_name + logging.info(f"修复空的mix_name: {data_item.get('title', 'N/A')} -> {mix_name}") + + # 如果还是没有找到源数据,保持原数据不变 + if not source_data: + logging.warning(f"无法找到对应的源数据: mix_name={mix_name}, mix_id={data_item.get('mix_id')}, title={data_item.get('title')}") + updated_data_array.append(data_item) + continue + + # 检查是否需要更新 - 包含所有Rankings_management字段 + fields_to_check = { + # 基础字段 + 'batch_id': data_item.get('batch_id'), + 'batch_time': data_item.get('batch_time'), + 'item_sequence': data_item.get('item_sequence'), + 'mix_id': data_item.get('mix_id'), + 'playcount': data_item.get('playcount'), + 'request_id': data_item.get('request_id'), + # 封面相关字段 + 'cover_image_url_original': data_item.get('cover_image_url_original'), + 'cover_upload_success': data_item.get('cover_upload_success'), + 'cover_backup_urls': data_item.get('cover_backup_urls'), + # 内容字段 + 'desc': data_item.get('desc'), + 'series_author': data_item.get('series_author'), + 'updated_to_episode': data_item.get('updated_to_episode'), + 'episode_video_ids': data_item.get('episode_video_ids'), + 'episode_details': data_item.get('episode_details'), + # 状态字段 + 'data_status': data_item.get('data_status'), + 'realtime_saved': data_item.get('realtime_saved'), + 'created_at': data_item.get('created_at'), + 'last_updated': data_item.get('last_updated'), + 'Manufacturing_Field': data_item.get('Manufacturing_Field'), + 'Copyright_field': data_item.get('Copyright_field'), + # 新增:内容分类字段 + 'Novel_IDs': data_item.get('Novel_IDs', []), + 'Anime_IDs': data_item.get('Anime_IDs', []), + 'Drama_IDs': data_item.get('Drama_IDs', []), + # 计算字段 + } + + # 🔒 检查字段锁定状态 + field_lock_status = ranking_doc.get('field_lock_status', {}) + manufacturing_locked = field_lock_status.get('Manufacturing_Field_locked', False) + copyright_locked = field_lock_status.get('Copyright_field_locked', False) + novel_ids_locked = field_lock_status.get('Novel_IDs_locked', False) + anime_ids_locked = field_lock_status.get('Anime_IDs_locked', False) + drama_ids_locked = field_lock_status.get('Drama_IDs_locked', False) + + # 检查哪些字段需要更新 + needs_update = False + for field_name, field_value in fields_to_check.items(): + # 🔒 字段锁定保护:如果字段已锁定,跳过更新 + if field_name == 'Manufacturing_Field' and manufacturing_locked: + logging.info(f"[字段锁定] 跳过Manufacturing_Field更新: {mix_name} (已锁定)") + continue + elif field_name == 'Copyright_field' and copyright_locked: + logging.info(f"[字段锁定] 跳过Copyright_field更新: {mix_name} (已锁定)") + continue + elif field_name == 'Novel_IDs' and novel_ids_locked: + logging.info(f"[字段锁定] 跳过Novel_IDs更新: {mix_name} (已锁定)") + continue + elif field_name == 'Anime_IDs' and anime_ids_locked: + logging.info(f"[字段锁定] 跳过Anime_IDs更新: {mix_name} (已锁定)") + continue + elif field_name == 'Drama_IDs' and drama_ids_locked: + logging.info(f"[字段锁定] 跳过Drama_IDs更新: {mix_name} (已锁定)") + continue + + # 对于数组字段,检查是否为空数组 + if field_name in ['cover_backup_urls', 'episode_video_ids', 'episode_details', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']: + if force_update or field_value is None or (isinstance(field_value, list) and len(field_value) == 0): + needs_update = True + break + # 对于其他字段,使用原来的条件 + elif force_update or field_value is None or field_value == '' or field_value == 0: + needs_update = True + break + + if not needs_update: + updated_data_array.append(data_item) + continue + + # 从源数据获取字段值并更新data_item + item_updated = False + for field_name, current_value in fields_to_check.items(): + # 🔒 字段锁定保护:如果字段已锁定,跳过更新 + if field_name == 'Manufacturing_Field' and manufacturing_locked: + logging.info(f"[字段锁定] 保护Manufacturing_Field不被覆盖: {mix_name}") + continue + elif field_name == 'Copyright_field' and copyright_locked: + logging.info(f"[字段锁定] 保护Copyright_field不被覆盖: {mix_name}") + continue + elif field_name == 'Novel_IDs' and novel_ids_locked: + logging.info(f"[字段锁定] 保护Novel_IDs不被覆盖: {mix_name}") + continue + elif field_name == 'Anime_IDs' and anime_ids_locked: + logging.info(f"[字段锁定] 保护Anime_IDs不被覆盖: {mix_name}") + continue + elif field_name == 'Drama_IDs' and drama_ids_locked: + logging.info(f"[字段锁定] 保护Drama_IDs不被覆盖: {mix_name}") + continue + + # 对于数组字段,检查是否为空数组 + should_update = False + if field_name in ['cover_backup_urls', 'episode_video_ids', 'episode_details', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']: + should_update = force_update or current_value is None or (isinstance(current_value, list) and len(current_value) == 0) + else: + should_update = force_update or current_value is None or current_value == '' or current_value == 0 + + if should_update: + if field_name == 'episode_details': + # 特殊处理episode_details字段,直接从源数据复制 + data_item[field_name] = source_data.get(field_name, []) + item_updated = True + elif field_name == 'cover_backup_urls': + # 特殊处理cover_backup_urls字段,确保是数组格式 + cover_backup_urls = source_data.get(field_name, []) + if not isinstance(cover_backup_urls, list): + cover_backup_urls = [] + data_item[field_name] = cover_backup_urls + item_updated = True + elif field_name == 'episode_video_ids': + # 特殊处理episode_video_ids字段,确保是数组格式 + episode_video_ids = source_data.get(field_name, []) + if not isinstance(episode_video_ids, list): + episode_video_ids = [] + data_item[field_name] = episode_video_ids + item_updated = True + elif field_name in ['Novel_IDs', 'Anime_IDs', 'Drama_IDs']: + # 特殊处理分类字段,确保是数组格式和互斥性 + classification_ids = source_data.get(field_name, []) + if not isinstance(classification_ids, list): + classification_ids = [] + + # 确保分类互斥性:如果当前字段有值,清空其他分类字段 + if classification_ids: + if field_name == 'Novel_IDs': + data_item['Anime_IDs'] = [] + data_item['Drama_IDs'] = [] + elif field_name == 'Anime_IDs': + data_item['Novel_IDs'] = [] + data_item['Drama_IDs'] = [] + elif field_name == 'Drama_IDs': + data_item['Novel_IDs'] = [] + data_item['Anime_IDs'] = [] + + data_item[field_name] = classification_ids + item_updated = True + else: + # 对于其他字段,直接从源数据获取 + source_value = source_data.get(field_name, '') + data_item[field_name] = source_value + item_updated = True + + # 🔒 保护重要字段:确保不覆盖播放量差值等关键数据 + # timeline_data字段必须保留 + # 保护其他重要的计算字段 + protected_fields = ['rank', 'play_vv', 'video_id', 'video_url', 'cover_image_url', 'playcount_str', 'timeline_data'] + # 这些字段不会被覆盖,因为它们不在fields_to_check中 + + if item_updated: + doc_updated = True + logging.info(f"✅ 成功同步data项目字段: {mix_name}") + + updated_data_array.append(data_item) + + except Exception as e: + logging.error(f"同步data项目失败 {data_item.get('mix_name', 'N/A')}: {e}") + # 保持原数据不变 + updated_data_array.append(data_item) + continue + + # 如果有任何项目被更新,更新整个文档的data数组 + if doc_updated: + daily_rankings_collection.update_one( + {"_id": ranking_doc["_id"]}, + {"$set": {"data": updated_data_array}} + ) + updated_items += 1 + logging.info(f"✅ 成功更新Ranking_storage文档的data数组: {ranking_doc.get('date', 'N/A')}") + else: + skipped_items += 1 + + except Exception as e: + logging.error(f"同步Ranking_storage文档失败 {ranking_doc.get('_id')}: {e}") + error_items += 1 + continue + + # 新的同步逻辑已经直接处理data数组,不需要重试机制 + + return { + "success": True, + "message": f"同步完成(重试 {retry_count} 次)", + "stats": { + "target_date": target_date, + "total_items": total_items, + "updated_items": updated_items, + "skipped_items": skipped_items, + "error_items": error_items, + "retry_count": retry_count, + "pending_items_final": len(pending_items), + "data_source": "Rankings_management" + } + } + + except Exception as e: + logging.error(f"同步Ranking_storage字段失败: {e}") + return { + "success": False, + "message": f"同步失败: {str(e)}" + } + + +@rank_bp.route('/sync_ranking_fields', methods=['POST']) +def sync_ranking_fields(): + """ + API端点:同步Ranking_storage字段 + """ + try: + data = request.get_json() or {} + target_date = data.get('target_date') + force_update = data.get('force_update', False) + + result = sync_ranking_storage_fields(target_date, force_update) + + if result["success"]: + return jsonify(result) + else: + return jsonify(result), 400 + + except Exception as e: + logging.error(f"同步API调用失败: {e}") + return jsonify({ + "success": False, + "message": f"API调用失败: {str(e)}" + }), 500 + + +@rank_bp.route('/validate_classification_exclusivity', methods=['POST']) +def validate_classification_exclusivity_api(): + """ + API端点:验证和修复分类互斥性 + 确保每个短剧只属于一个分类(Novel_IDs、Anime_IDs、Drama_IDs) + """ + try: + result = validate_and_fix_classification_exclusivity() + + if result["success"]: + return jsonify(result) + else: + return jsonify(result), 400 + + except Exception as e: + logging.error(f"验证分类互斥性API失败: {e}") + return jsonify({ + "success": False, + "message": f"验证分类互斥性失败: {str(e)}" + }), 500 \ No newline at end of file diff --git a/frontend/public/favicon.ico b/frontend/public/favicon.ico deleted file mode 100644 index df36fcf..0000000 Binary files a/frontend/public/favicon.ico and /dev/null differ diff --git a/frontend/public/placeholder-poster.svg b/frontend/public/placeholder-poster.svg deleted file mode 100644 index 0e44635..0000000 --- a/frontend/public/placeholder-poster.svg +++ /dev/null @@ -1,5 +0,0 @@ - - - 暂无 - 图片 - \ No newline at end of file diff --git a/frontend/src/AdminPanel.vue b/frontend/src/AdminPanel.vue new file mode 100644 index 0000000..299e774 --- /dev/null +++ b/frontend/src/AdminPanel.vue @@ -0,0 +1,1178 @@ + + + + + + + \ No newline at end of file diff --git a/frontend/src/App.vue b/frontend/src/App.vue index fe8a138..c809aa8 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -1,7 +1,11 @@ + + + + \ No newline at end of file diff --git a/frontend/src/main.js b/frontend/src/main.js index 01433bc..30a5f54 100644 --- a/frontend/src/main.js +++ b/frontend/src/main.js @@ -1,4 +1,8 @@ import { createApp } from 'vue' +import './style.css' import App from './App.vue' +import router from './router' -createApp(App).mount('#app') +const app = createApp(App) + +app.use(router).mount('#app') diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js new file mode 100644 index 0000000..ace39c5 --- /dev/null +++ b/frontend/src/router/index.js @@ -0,0 +1,17 @@ +import { createRouter, createWebHistory } from 'vue-router' +import AdminPanel from '../AdminPanel.vue' + +const routes = [ + { + path: '/admin', + name: 'Admin', + component: AdminPanel + } +] + +const router = createRouter({ + history: createWebHistory(), + routes +}) + +export default router \ No newline at end of file diff --git a/frontend/src/style.css b/frontend/src/style.css new file mode 100644 index 0000000..ee1d7e2 --- /dev/null +++ b/frontend/src/style.css @@ -0,0 +1,60 @@ +/* 全局样式重置 */ +* { + margin: 0; + padding: 0; + box-sizing: border-box; +} + +body { + font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen', + 'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue', + sans-serif; + -webkit-font-smoothing: antialiased; + -moz-osx-font-smoothing: grayscale; + background-color: #ebedf2; + color: #333; +} + +#app { + width: 100%; + min-height: 100vh; +} + +/* 通用按钮样式 */ +button { + cursor: pointer; + border: none; + outline: none; + font-family: inherit; +} + +/* 通用输入框样式 */ +input, textarea, select { + font-family: inherit; + outline: none; +} + +/* 滚动条样式 */ +::-webkit-scrollbar { + width: 6px; +} + +::-webkit-scrollbar-track { + background: #f1f1f1; +} + +::-webkit-scrollbar-thumb { + background: #c1c1c1; + border-radius: 3px; +} + +::-webkit-scrollbar-thumb:hover { + background: #a8a8a8; +} + +/* 响应式设计 */ +@media (max-width: 480px) { + body { + font-size: 14px; + } +} \ No newline at end of file