diff --git a/Timer_worker.py b/Timer_worker.py index 51e5b57..7364176 100644 --- a/Timer_worker.py +++ b/Timer_worker.py @@ -4,9 +4,16 @@ 抖音播放量自动抓取定时器 - 跨平台版本 功能: -- 每晚24:00自动执行抖音播放量抓取任务 +- 每晚自动执行抖音播放量抓取任务 +- 数据抓取完成后自动生类榜单 - 支持Windows、macOS、Linux - 自动保存数据到MongoDB + +使用方法: +- 正常模式:python Timer_worker.py(启动定时器) +- 测试模式:python Timer_worker.py --test(立即执行一次) +- 单次执行:python Timer_worker.py --once(立即执行一次并退出) +- 仅生成榜单:python Timer_worker.py --ranking-only(仅生成榜单,不抓取数据) """ import schedule @@ -14,14 +21,17 @@ import time import sys import os import logging +import argparse from pathlib import Path -from datetime import datetime +from datetime import datetime, date import config # 添加项目路径到 Python 路径 sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings')) from rank_data_scraper import DouyinPlayVVScraper + + # 配置日志的函数 def setup_logging(): """设置日志配置""" @@ -63,12 +73,198 @@ class DouyinAutoScheduler: scraper.run() logging.info("✅ 抖音播放量抓取任务执行成功") + + # 数据抓取完成后,自动生成当日榜单 + self.generate_daily_rankings() except Exception as e: logging.error(f"💥 执行任务时发生异常: {e}") import traceback logging.error(f"详细错误信息: {traceback.format_exc()}") + def generate_daily_rankings(self): + """生成每日榜单数据(基于时间轴对比)""" + try: + from database import db + from datetime import timedelta + + # 获取集合 + douyin_collection = db['Rankings_list'] # 使用真实抓取的数据 + rankings_collection = db['Ranking_storage'] + + today = date.today() + yesterday = today - timedelta(days=1) + today_str = today.strftime('%Y-%m-%d') + yesterday_str = yesterday.strftime('%Y-%m-%d') + + logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str})...") + + # 删除当天已有的榜单数据 + rankings_collection.delete_many({"date": today_str}) + logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据") + + # 获取今天和昨天的榜单数据进行对比 + try: + logging.info("🔄 正在生成时间轴对比榜单...") + + # 获取今天的数据,按短剧名称去重,只保留播放量最高的 + today_videos_raw = list(douyin_collection.find({}).sort("play_vv", -1)) + + # 按短剧名称去重,每个短剧只保留播放量最高的一条 + unique_videos = {} + for video in today_videos_raw: + mix_name = video.get("mix_name", "") + if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)): + unique_videos[mix_name] = video + + today_videos = list(unique_videos.values()) + + logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)") + + # 获取昨天的榜单数据(如果存在),取最新的计算结果 + yesterday_ranking = rankings_collection.find_one({ + "date": yesterday_str, + "type": "comprehensive" + }, sort=[("calculation_sequence", -1)]) + + yesterday_data = {} + if yesterday_ranking and "data" in yesterday_ranking: + # 将昨天的数据转换为字典,以短剧名称为键 + for item in yesterday_ranking["data"]: + title = item.get("title", "") + if title: + yesterday_data[title] = { + "rank": item.get("rank", 0), + "play_vv": item.get("play_vv", 0), + "video_id": item.get("video_id", "") + } + logging.info(f"📊 找到昨天的榜单数据,共 {len(yesterday_data)} 个短剧") + else: + logging.info("📊 未找到昨天的榜单数据,将作为首次生成") + + if today_videos: + # 先计算所有视频的播放量差值 + videos_with_growth = [] + for video in today_videos: + video_id = str(video.get("_id", "")) + current_play_vv = video.get("play_vv", 0) + + # 计算与昨天的对比数据 + play_vv_change = 0 + play_vv_change_rate = 0 + is_new = True + + mix_name = video.get("mix_name", "") + if mix_name in yesterday_data: + is_new = False + yesterday_play_vv = yesterday_data[mix_name]["play_vv"] + + # 计算播放量变化 + play_vv_change = current_play_vv - yesterday_play_vv + if yesterday_play_vv > 0: + play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2) + + # 创建包含增长数据的视频项 + video_with_growth = { + "video": video, + "play_vv_change": play_vv_change, + "play_vv_change_rate": play_vv_change_rate, + "is_new": is_new, + "yesterday_data": yesterday_data.get(mix_name, {}) + } + videos_with_growth.append(video_with_growth) + + # 按播放量差值降序排序(差值越大排名越靠前) + videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True) + + comprehensive_ranking = { + "date": today_str, + "type": "comprehensive", + "name": "播放量增长榜单", + "description": f"基于 {yesterday_str} 和 {today_str} 播放量差值排序的榜单(差值越大排名越靠前)", + "comparison_date": yesterday_str, + "total_videos": len(videos_with_growth), + "data": [] + } + + # 生成排序后的榜单数据 + for i, item in enumerate(videos_with_growth, 1): + video = item["video"] + video_id = str(video.get("_id", "")) + current_play_vv = video.get("play_vv", 0) + mix_name = video.get("mix_name", "") + + # 计算排名变化(基于昨天的排名) + rank_change = 0 + if not item["is_new"] and item["yesterday_data"]: + yesterday_rank = item["yesterday_data"].get("rank", 0) + rank_change = yesterday_rank - i + + ranking_item = { + "rank": i, + "title": mix_name, + "play_vv": current_play_vv, + "author": video.get("author", ""), + "video_id": video_id, + "video_url": video.get("video_url", ""), + "cover_image_url": video.get("cover_image_url", ""), + "playcount_str": video.get("playcount", ""), + # 时间轴对比数据 + "timeline_data": { + "is_new": item["is_new"], + "rank_change": rank_change, + "play_vv_change": item["play_vv_change"], + "play_vv_change_rate": item["play_vv_change_rate"], + "yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0, + "yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0 + } + } + + comprehensive_ranking["data"].append(ranking_item) + + # 为每次计算添加唯一的时间戳,确保数据唯一性 + current_timestamp = datetime.now() + comprehensive_ranking["created_at"] = current_timestamp + comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}" + + # 检查今天已有多少次计算 + existing_count = rankings_collection.count_documents({ + "date": today_str, + "type": "comprehensive" + }) + + comprehensive_ranking["calculation_sequence"] = existing_count + 1 + + # 总是插入新的榜单记录,保留所有历史计算数据 + rankings_collection.insert_one(comprehensive_ranking) + logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)") + logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}") + + # 统计信息 + new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"]) + logging.info(f"✅ 时间轴对比榜单生成成功") + logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录") + logging.info(f"🆕 新上榜 {new_count} 条") + logging.info(f"🔄 对比基准日期: {yesterday_str}") + + return True + else: + logging.warning("⚠️ 榜单生成失败:无今日数据") + return False + + except Exception as e: + logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}") + import traceback + logging.error(f"详细错误信息: {traceback.format_exc()}") + return False + + except Exception as e: + logging.error(f"💥 生成榜单时发生异常: {e}") + import traceback + logging.error(f"详细错误信息: {traceback.format_exc()}") + + + def setup_schedule(self): """设置定时任务""" # 从配置文件读取执行时间 @@ -93,6 +289,11 @@ class DouyinAutoScheduler: """测试模式 - 立即执行一次""" logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...") self.run_douyin_scraper() + + def run_ranking_only(self): + """仅生成榜单(不抓取数据)""" + logging.info("📊 仅生成榜单模式...") + self.generate_daily_rankings() def start_scheduler(self): """启动定时器""" @@ -119,25 +320,44 @@ class DouyinAutoScheduler: def main(): """主函数""" import argparse + + try: + parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器') + parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次') + parser.add_argument('--once', action='store_true', help='立即执行一次并退出') + parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)') - parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器') - parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次') - parser.add_argument('--once', action='store_true', help='立即执行一次并退出') + args = parser.parse_args() - args = parser.parse_args() + # 设置日志配置 + setup_logging() + + print("正在初始化定时器...") + scheduler = DouyinAutoScheduler() - # 设置日志配置 - setup_logging() - - scheduler = DouyinAutoScheduler() - - if args.test: - scheduler.run_test() - elif args.once: - scheduler.run_once() - else: - scheduler.setup_schedule() - scheduler.start_scheduler() + if args.test: + print("执行测试模式...") + scheduler.run_test() + elif args.once: + print("执行单次模式...") + scheduler.run_once() + elif args.ranking_only: + print("执行榜单生成模式...") + scheduler.run_ranking_only() + else: + print("启动定时器模式...") + scheduler.setup_schedule() + scheduler.start_scheduler() + + print("程序执行完成") + + except Exception as e: + print(f"程序执行出错: {e}") + import traceback + traceback.print_exc() + return 1 + + return 0 if __name__ == '__main__': main() \ No newline at end of file diff --git a/config.py b/config.py index 29cb146..035616d 100644 --- a/config.py +++ b/config.py @@ -4,7 +4,7 @@ import importlib # 数据库配置 MONGO_URI = "mongodb://localhost:27017" # MONGO_URI = "mongodb://mongouser:Jdei2243afN@172.16.0.6:27017,172.16.0.4:27017/test?replicaSet=cmgo-r6qkaern_0&authSource=admin" -MONGO_DB_NAME = "kemeng_media" +MONGO_DB_NAME = "Rankings" # 应用配置 APP_ENV = os.getenv('APP_ENV', 'development') @@ -15,6 +15,6 @@ LOG_LEVEL = 'INFO' LOG_DIR = 'logs' # 定时器配置 -SCHEDULER_TIME = "00:01" # 定时器执行时间,格式为 HH:MM (24小时制) +SCHEDULER_TIME = "24:00" # 定时器执行时间,格式为 HH:MM (24小时制) print(f"Successfully loaded configuration for environment: {APP_ENV}") \ No newline at end of file diff --git a/routers/rank_api_routes.py b/routers/rank_api_routes.py index 4be0623..ecbe162 100644 --- a/routers/rank_api_routes.py +++ b/routers/rank_api_routes.py @@ -16,6 +16,7 @@ rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank') # 获取数据库集合 collection = db['Rankings_list'] +daily_rankings_collection = db['Ranking_storage'] # 榜单存储表 def format_playcount(playcount_str): """格式化播放量字符串为数字""" @@ -63,6 +64,59 @@ def format_time(time_obj): else: return str(time_obj) +def sort_ranking_data(ranking_data, sort_by, sort_order='desc'): + """ + 对榜单数据进行动态排序 + + Args: + ranking_data: 榜单数据列表 + sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank) + sort_order: 排序顺序 (asc, desc) + + Returns: + 排序后的榜单数据 + """ + try: + # 定义排序键函数 + def get_sort_key(item): + if sort_by == 'play_vv_change': + # 按播放量差值排序 + timeline_data = item.get('timeline_data', {}) + return timeline_data.get('play_vv_change', 0) + elif sort_by == 'play_vv_change_rate': + # 按播放量变化率排序 + timeline_data = item.get('timeline_data', {}) + return timeline_data.get('play_vv_change_rate', 0) + elif sort_by == 'play_vv': + # 按当前播放量排序 + return item.get('play_vv', 0) + elif sort_by == 'rank': + # 按排名排序 + return item.get('rank', 999999) + else: + # 默认按排名排序 + return item.get('rank', 999999) + + # 执行排序 + reverse = (sort_order == 'desc') + + # 对于排名字段,降序实际上是升序(排名越小越好) + if sort_by == 'rank': + reverse = (sort_order == 'asc') + + sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse) + + # 重新分配排名 + for i, item in enumerate(sorted_data, 1): + item['current_sort_rank'] = i + + return sorted_data + + except Exception as e: + logging.error(f"排序榜单数据失败: {e}") + # 如果排序失败,返回原始数据 + return ranking_data + def format_mix_item(doc): """格式化合集数据项 - 完全按照数据库原始字段返回""" return { @@ -155,7 +209,7 @@ def get_mix_list(page=1, limit=20, sort_by="playcount"): return {"success": False, "message": f"获取数据失败: {str(e)}"} def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): - """获取按播放量增长排序的合集列表""" + """获取按播放量增长排序的合集列表 - 优先从定时器生成的数据中读取""" try: # 计算跳过的数量 skip = (page - 1) * limit @@ -171,6 +225,53 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): if isinstance(end_date, str): end_date = datetime.strptime(end_date, "%Y-%m-%d").date() + end_date_str = end_date.strftime("%Y-%m-%d") + start_date_str = start_date.strftime("%Y-%m-%d") + + # 优先尝试从定时器生成的增长榜数据中读取 + try: + growth_ranking = daily_rankings_collection.find_one({ + "date": end_date_str, + "type": "growth", + "start_date": start_date_str, + "end_date": end_date_str + }, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果 + + if growth_ranking and "data" in growth_ranking: + logging.info(f"📈 从定时器生成的增长榜数据中读取 {end_date_str} 的增长榜") + + # 获取预先计算好的增长榜数据 + growth_data = growth_ranking["data"] + + # 分页处理 + total = len(growth_data) + paginated_data = growth_data[skip:skip + limit] + + return { + "success": True, + "data": paginated_data, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "pages": (total + limit - 1) // limit, + "has_next": page * limit < total, + "has_prev": page > 1 + }, + "sort_by": "growth", + "date_range": { + "start_date": start_date_str, + "end_date": end_date_str + }, + "data_source": "timer_generated", # 标识数据来源 + "update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", "")) + } + except Exception as e: + logging.warning(f"从定时器数据读取增长榜失败,将使用动态计算: {e}") + + # 如果定时器数据不存在或读取失败,回退到动态计算 + logging.info(f"📊 动态计算 {start_date_str} 到 {end_date_str} 的增长榜") + # 查询结束日期的数据 end_cursor = collection.find({ "batch_time": { @@ -204,15 +305,15 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): if growth > 0: item = format_mix_item(end_item) item["growth"] = growth - item["start_date"] = start_date.strftime("%Y-%m-%d") - item["end_date"] = end_date.strftime("%Y-%m-%d") + item["start_date"] = start_date_str + item["end_date"] = end_date_str growth_data.append(item) else: # 如果开始日期没有数据,但结束日期有,也认为是新增长 item = format_mix_item(end_item) item["growth"] = end_item.get("play_vv", 0) - item["start_date"] = start_date.strftime("%Y-%m-%d") - item["end_date"] = end_date.strftime("%Y-%m-%d") + item["start_date"] = start_date_str + item["end_date"] = end_date_str growth_data.append(item) # 按增长值降序排序 @@ -239,9 +340,10 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): }, "sort_by": "growth", "date_range": { - "start_date": start_date.strftime("%Y-%m-%d"), - "end_date": end_date.strftime("%Y-%m-%d") + "start_date": start_date_str, + "end_date": end_date_str }, + "data_source": "dynamic_calculation", # 标识数据来源 "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } @@ -499,4 +601,246 @@ def health_check(): }) except Exception as e: logging.error(f"健康检查失败: {e}") - return jsonify({"success": False, "message": f"服务异常: {str(e)}"}) \ No newline at end of file + return jsonify({"success": False, "message": f"服务异常: {str(e)}"}) + + +# ==================== 榜单查询API接口 ==================== + +@rank_bp.route('/rankings') +def get_rankings(): + """获取榜单列表 - 支持按日期和类型查询,支持动态排序""" + try: + # 获取查询参数 + date = request.args.get('date') # 日期,格式:YYYY-MM-DD + ranking_type = request.args.get('type') # 榜单类型:playcount, growth, newcomer + sort_by = request.args.get('sort_by', 'default') # 排序方式:default, play_vv_change, play_vv_change_rate, play_vv + sort_order = request.args.get('sort_order', 'desc') # 排序顺序:asc, desc + page = int(request.args.get('page', 1)) + limit = int(request.args.get('limit', 50)) + + # 构建查询条件 + query = {} + if date: + query['date'] = date + if ranking_type: + query['ranking_type'] = ranking_type + + # 如果没有指定日期,默认获取最新日期的榜单 + if not date: + latest_ranking = daily_rankings_collection.find_one( + {}, sort=[('date', -1)] + ) + if latest_ranking: + query['date'] = latest_ranking['date'] + + # 查询榜单 + rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1)) + + if not rankings: + return jsonify({ + "success": True, + "message": "暂无榜单数据", + "data": { + "rankings": [], + "total": 0, + "page": page, + "limit": limit + } + }) + + # 格式化返回数据 + formatted_rankings = [] + for ranking in rankings: + ranking_data = ranking.get('data', []) + + # 动态排序逻辑 + if sort_by != 'default' and ranking_data: + ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order) + + # 分页处理榜单数据 + start_idx = (page - 1) * limit + end_idx = start_idx + limit + paginated_data = ranking_data[start_idx:end_idx] + + formatted_rankings.append({ + "date": ranking.get('date'), + "ranking_type": ranking.get('ranking_type'), + "ranking_name": ranking.get('ranking_name'), + "description": ranking.get('description'), + "data": paginated_data, + "total_count": len(ranking_data), + "current_page_count": len(paginated_data), + "generated_at": format_time(ranking.get('generated_at')), + "version": ranking.get('version', '1.0'), + "sort_info": { + "sort_by": sort_by, + "sort_order": sort_order + } + }) + + return jsonify({ + "success": True, + "message": "获取榜单成功", + "data": { + "rankings": formatted_rankings, + "total": len(formatted_rankings), + "page": page, + "limit": limit, + "sort_by": sort_by, + "sort_order": sort_order + } + }) + + except Exception as e: + logging.error(f"获取榜单失败: {e}") + return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"}) + + +@rank_bp.route('/rankings/dates') +def get_ranking_dates(): + """获取可用的榜单日期列表""" + try: + # 获取所有不重复的日期 + dates = daily_rankings_collection.distinct('date') + dates.sort(reverse=True) # 按日期倒序排列 + + return jsonify({ + "success": True, + "message": "获取日期列表成功", + "data": { + "dates": dates, + "total": len(dates) + } + }) + + except Exception as e: + logging.error(f"获取日期列表失败: {e}") + return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"}) + + +@rank_bp.route('/rankings/types') +def get_ranking_types(): + """获取支持的榜单类型""" + try: + # 获取所有不重复的榜单类型 + types = daily_rankings_collection.distinct('ranking_type') + + # 添加类型说明 + type_descriptions = { + 'playcount': '播放量榜 - 按播放量排序', + 'growth': '增长榜 - 播放量增长最快', + 'newcomer': '新晋榜 - 新上榜内容' + } + + formatted_types = [] + for type_name in types: + formatted_types.append({ + "type": type_name, + "description": type_descriptions.get(type_name, type_name) + }) + + return jsonify({ + "success": True, + "message": "获取榜单类型成功", + "data": { + "types": formatted_types, + "total": len(formatted_types) + } + }) + + except Exception as e: + logging.error(f"获取榜单类型失败: {e}") + return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"}) + + +@rank_bp.route('/rankings/latest') +def get_latest_rankings(): + """获取最新的所有类型榜单""" + try: + # 获取最新日期 + latest_ranking = daily_rankings_collection.find_one( + {}, sort=[('date', -1)] + ) + + if not latest_ranking: + return jsonify({ + "success": True, + "message": "暂无榜单数据", + "data": { + "date": None, + "rankings": [] + } + }) + + latest_date = latest_ranking['date'] + + # 获取该日期的所有榜单 + rankings = list(daily_rankings_collection.find({ + 'date': latest_date + }).sort('ranking_type', 1)) + + formatted_rankings = [] + for ranking in rankings: + # 只返回前20条数据 + ranking_data = ranking.get('data', [])[:20] + + formatted_rankings.append({ + "ranking_type": ranking.get('ranking_type'), + "ranking_name": ranking.get('ranking_name'), + "description": ranking.get('description'), + "data": ranking_data, + "total_count": ranking.get('total_count', 0), + "preview_count": len(ranking_data) + }) + + return jsonify({ + "success": True, + "message": "获取最新榜单成功", + "data": { + "date": latest_date, + "rankings": formatted_rankings, + "total_types": len(formatted_rankings) + } + }) + + except Exception as e: + logging.error(f"获取最新榜单失败: {e}") + return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"}) + + +@rank_bp.route('/rankings/stats') +def get_rankings_stats(): + """获取榜单统计信息""" + try: + # 统计总榜单数 + total_rankings = daily_rankings_collection.count_documents({}) + + # 统计日期数量 + total_dates = len(daily_rankings_collection.distinct('date')) + + # 统计榜单类型数量 + total_types = len(daily_rankings_collection.distinct('ranking_type')) + + # 获取最新和最早日期 + latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)]) + earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)]) + + latest_date = latest_ranking['date'] if latest_ranking else None + earliest_date = earliest_ranking['date'] if earliest_ranking else None + + return jsonify({ + "success": True, + "message": "获取榜单统计成功", + "data": { + "total_rankings": total_rankings, + "total_dates": total_dates, + "total_types": total_types, + "latest_date": latest_date, + "earliest_date": earliest_date, + "date_range": f"{earliest_date} 至 {latest_date}" if earliest_date and latest_date else "暂无数据" + } + }) + + except Exception as e: + logging.error(f"获取榜单统计失败: {e}") + return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"}) \ No newline at end of file