#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 小程序专用抖音播放量数据API服务器 优化的数据格式和接口设计,专为小程序使用 """ from flask import Blueprint, request, jsonify from datetime import datetime, timedelta import logging import re from database import db # 创建蓝图 rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank') # 获取数据库集合 collection = db['Rankings_list'] def format_playcount(playcount_str): """格式化播放量字符串为数字""" if not playcount_str: return 0 try: if isinstance(playcount_str, (int, float)): return int(playcount_str) playcount_str = str(playcount_str).strip() # 处理亿、万等单位 if "亿" in playcount_str: num = float(re.findall(r'[\d.]+', playcount_str)[0]) return int(num * 100000000) elif "万" in playcount_str: num = float(re.findall(r'[\d.]+', playcount_str)[0]) return int(num * 10000) else: # 尝试直接转换数字 return int(float(playcount_str)) except: return 0 def format_cover_url(cover_data): """格式化封面图片URL""" if not cover_data: return "" if isinstance(cover_data, str): return cover_data elif isinstance(cover_data, dict) and 'url_list' in cover_data: return cover_data['url_list'][0] if cover_data['url_list'] else "" else: return "" def format_time(time_obj): """格式化时间""" if not time_obj: return "" if isinstance(time_obj, datetime): return time_obj.strftime("%Y-%m-%d %H:%M:%S") else: return str(time_obj) def format_mix_item(doc): """格式化合集数据项 - 完全按照数据库原始字段返回""" return { "_id": str(doc.get("_id", "")), "batch_time": format_time(doc.get("batch_time")), "mix_name": doc.get("mix_name", ""), "video_url": doc.get("video_url", ""), "playcount": doc.get("playcount", ""), "play_vv": doc.get("play_vv", 0), "request_id": doc.get("request_id", ""), "rank": doc.get("rank", 0), "cover_image_url": doc.get("cover_image_url", ""), "cover_backup_urls": doc.get("cover_backup_urls", []) } def get_mix_list(page=1, limit=20, sort_by="playcount"): """获取合集列表(分页)""" try: # 计算跳过的数量 skip = (page - 1) * limit # 设置排序字段 if sort_by == "growth": # 按增长排序需要特殊处理 return get_growth_mixes(page, limit) else: sort_field = "play_vv" if sort_by == "playcount" else "batch_time" sort_order = -1 # 降序 # 获取今天的日期 today = datetime.now().date() # 只查询今天的数据 query_condition = { "batch_time": { "$gte": datetime(today.year, today.month, today.day), "$lt": datetime(today.year, today.month, today.day) + timedelta(days=1) } } # 查询数据并按短剧名称分组,取每个短剧的最新记录 pipeline = [ {"$match": query_condition}, {"$sort": {"batch_time": -1}}, # 按时间倒序 {"$group": { "_id": "$mix_name", # 按短剧名称分组 "latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录) }}, {"$replaceRoot": {"newRoot": "$latest_doc"}}, {"$sort": {sort_field: sort_order}}, {"$skip": skip}, {"$limit": limit} ] docs = list(collection.aggregate(pipeline)) # 获取总数 total_pipeline = [ {"$match": query_condition}, {"$sort": {"batch_time": -1}}, {"$group": {"_id": "$mix_name"}}, {"$count": "total"} ] total_result = list(collection.aggregate(total_pipeline)) total = total_result[0]["total"] if total_result else 0 # 格式化数据 mix_list = [] for doc in docs: item = format_mix_item(doc) mix_list.append(item) return { "success": True, "data": mix_list, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": sort_by, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取合集列表失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): """获取按播放量增长排序的合集列表""" try: # 计算跳过的数量 skip = (page - 1) * limit # 如果没有提供日期,默认使用今天和昨天 if not start_date or not end_date: end_date = datetime.now().date() start_date = end_date - timedelta(days=1) else: # 转换字符串日期为datetime对象 if isinstance(start_date, str): start_date = datetime.strptime(start_date, "%Y-%m-%d").date() if isinstance(end_date, str): end_date = datetime.strptime(end_date, "%Y-%m-%d").date() # 查询结束日期的数据 end_cursor = collection.find({ "batch_time": { "$gte": datetime(end_date.year, end_date.month, end_date.day), "$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1) } }) end_data = list(end_cursor) # 查询开始日期的数据 start_cursor = collection.find({ "batch_time": { "$gte": datetime(start_date.year, start_date.month, start_date.day), "$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1) } }) start_data = list(start_cursor) # 创建字典以便快速查找 end_dict = {item["mix_name"]: item for item in end_data} start_dict = {item["mix_name"]: item for item in start_data} # 计算增长数据 growth_data = [] for mix_name, end_item in end_dict.items(): if mix_name in start_dict: start_item = start_dict[mix_name] growth = end_item.get("play_vv", 0) - start_item.get("play_vv", 0) # 只保留增长为正的数据 if growth > 0: item = format_mix_item(end_item) item["growth"] = growth item["start_date"] = start_date.strftime("%Y-%m-%d") item["end_date"] = end_date.strftime("%Y-%m-%d") growth_data.append(item) else: # 如果开始日期没有数据,但结束日期有,也认为是新增长 item = format_mix_item(end_item) item["growth"] = end_item.get("play_vv", 0) item["start_date"] = start_date.strftime("%Y-%m-%d") item["end_date"] = end_date.strftime("%Y-%m-%d") growth_data.append(item) # 按增长值降序排序 growth_data.sort(key=lambda x: x.get("growth", 0), reverse=True) # 分页处理 total = len(growth_data) paginated_data = growth_data[skip:skip + limit] # 添加排名 for i, item in enumerate(paginated_data): item["rank"] = skip + i + 1 return { "success": True, "data": paginated_data, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": "growth", "date_range": { "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d") }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取增长合集列表失败: {e}") # 如果增长计算失败,返回按播放量排序的数据作为备选 return get_mix_list(page, limit, "playcount") def get_top_mixes(limit=10): """获取热门合集(TOP榜单)""" try: # 按播放量排序获取热门合集 cursor = collection.find().sort("play_vv", -1).limit(limit) docs = list(cursor) if not docs: return {"success": False, "message": "暂无数据"} # 格式化数据 top_list = [] for doc in docs: item = format_mix_item(doc) top_list.append(item) return { "success": True, "data": top_list, "total": len(top_list), "update_time": format_time(docs[0].get("batch_time")) if docs else "" } except Exception as e: logging.error(f"获取热门合集失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def search_mixes(keyword, page=1, limit=10): """搜索合集""" try: if not keyword: return {"success": False, "message": "请提供搜索关键词"} # 计算跳过的数量 skip = (page - 1) * limit # 构建搜索条件(模糊匹配合集名称) search_condition = { "mix_name": {"$regex": keyword, "$options": "i"} } # 查询数据 cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit) docs = list(cursor) # 获取搜索结果总数 total = collection.count_documents(search_condition) # 格式化数据 search_results = [] for doc in docs: item = format_mix_item(doc) search_results.append(item) return { "success": True, "data": search_results, "keyword": keyword, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"搜索合集失败: {e}") return {"success": False, "message": f"搜索失败: {str(e)}"} def get_mix_detail(mix_id): """获取合集详情""" try: from bson import ObjectId # 尝试通过ObjectId查找 try: doc = collection.find_one({"_id": ObjectId(mix_id)}) except: # 如果ObjectId无效,尝试其他字段 doc = collection.find_one({ "$or": [ {"mix_name": mix_id}, {"request_id": mix_id} ] }) if not doc: return {"success": False, "message": "未找到合集信息"} # 格式化详细信息 - 只返回数据库原始字段 detail = format_mix_item(doc) return { "success": True, "data": detail, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取合集详情失败: {e}") return {"success": False, "message": f"获取详情失败: {str(e)}"} def get_statistics(): """获取统计信息""" try: # 基本统计 total_mixes = collection.count_documents({}) if total_mixes == 0: return {"success": False, "message": "暂无数据"} # 播放量统计 pipeline = [ { "$group": { "_id": None, "total_playcount": {"$sum": "$play_vv"}, "avg_playcount": {"$avg": "$play_vv"}, "max_playcount": {"$max": "$play_vv"}, "min_playcount": {"$min": "$play_vv"} } } ] stats_result = list(collection.aggregate(pipeline)) stats = stats_result[0] if stats_result else {} # 获取最新更新时间 latest_doc = collection.find().sort("batch_time", -1).limit(1) latest_time = "" if latest_doc: latest_list = list(latest_doc) if latest_list: latest_time = format_time(latest_list[0].get("batch_time")) # 热门分类统计(按播放量区间) categories = [ {"name": "超热门", "min": 100000000, "count": 0}, # 1亿+ {"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿 {"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万 {"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下 ] for category in categories: if "max" in category: count = collection.count_documents({ "play_vv": {"$gte": category["min"], "$lte": category["max"]} }) else: count = collection.count_documents({ "play_vv": {"$gte": category["min"]} }) category["count"] = count return { "success": True, "data": { "total_mixes": total_mixes, "total_playcount": stats.get("total_playcount", 0), "avg_playcount": int(stats.get("avg_playcount", 0)), "max_playcount": stats.get("max_playcount", 0), "min_playcount": stats.get("min_playcount", 0), "categories": categories, "latest_update": latest_time }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取统计信息失败: {e}") return {"success": False, "message": f"获取统计失败: {str(e)}"} # 路由定义 @rank_bp.route('/videos') def get_videos(): """获取合集列表 - 兼容app.py调用""" page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 20)) sort_by = request.args.get('sort', 'playcount') if sort_by == 'growth': start_date = request.args.get('start_date') end_date = request.args.get('end_date') result = get_growth_mixes(page, limit, start_date, end_date) else: result = get_mix_list(page, limit, sort_by) return jsonify(result) @rank_bp.route('/top') def get_top(): """获取热门榜单 - 兼容app.py调用""" limit = int(request.args.get('limit', 10)) result = get_top_mixes(limit) return jsonify(result) @rank_bp.route('/search') def search(): """搜索合集 - 兼容app.py调用""" keyword = request.args.get('q', '') page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 10)) result = search_mixes(keyword, page, limit) return jsonify(result) @rank_bp.route('/detail') def get_detail(): """获取合集详情 - 兼容app.py调用""" mix_id = request.args.get('id', '') result = get_mix_detail(mix_id) return jsonify(result) @rank_bp.route('/stats') def get_stats(): """获取统计信息 - 兼容app.py调用""" result = get_statistics() return jsonify(result) @rank_bp.route('/health') def health_check(): """健康检查 - 兼容app.py调用""" try: from database import client # 检查数据库连接 if not client: return jsonify({"success": False, "message": "数据库未连接"}) # 测试数据库连接 client.admin.command('ping') # 获取数据统计 total_count = collection.count_documents({}) return jsonify({ "success": True, "message": "服务正常", "data": { "database": "连接正常", "total_records": total_count, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } }) except Exception as e: logging.error(f"健康检查失败: {e}") return jsonify({"success": False, "message": f"服务异常: {str(e)}"})