#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 小程序专用抖音播放量数据API服务器 优化的数据格式和接口设计,专为小程序使用 """ from flask import Flask, jsonify, request from flask_cors import CORS from pymongo import MongoClient from datetime import datetime, timedelta import logging import os import re # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('miniprogram_api.log', encoding='utf-8'), logging.StreamHandler() ] ) app = Flask(__name__) CORS(app) # 允许跨域访问,支持小程序调用 class MiniprogramAPI: def __init__(self): self.client = None self.db = None self.collection = None self.connect_mongodb() def connect_mongodb(self): """连接MongoDB数据库""" try: self.client = MongoClient('mongodb://localhost:27017/') # 测试连接 self.client.admin.command('ping') # 使用数据库与集合 self.db = self.client['douyin_data'] self.collection = self.db['play_vv_records'] logging.info("MongoDB连接成功") return True except Exception as e: logging.error(f"MongoDB连接失败: {e}") return False def format_playcount(self, playcount_str): """格式化播放量字符串为数字""" if not playcount_str: return 0 try: if isinstance(playcount_str, (int, float)): return int(playcount_str) playcount_str = str(playcount_str).strip() # 处理亿、万等单位 if "亿" in playcount_str: num = float(re.findall(r'[\d.]+', playcount_str)[0]) return int(num * 100000000) elif "万" in playcount_str: num = float(re.findall(r'[\d.]+', playcount_str)[0]) return int(num * 10000) else: # 尝试直接转换数字 return int(float(playcount_str)) except: return 0 def format_cover_url(self, cover_data): """格式化封面图片URL""" if not cover_data: return "" if isinstance(cover_data, str): return cover_data elif isinstance(cover_data, dict) and 'url_list' in cover_data: return cover_data['url_list'][0] if cover_data['url_list'] else "" else: return "" def format_time(self, time_obj): """格式化时间""" if not time_obj: return "" if isinstance(time_obj, datetime): return time_obj.strftime("%Y-%m-%d %H:%M:%S") else: return str(time_obj) def format_video_item(self, doc): """格式化单个视频数据项 - 完全按照数据库原始字段返回""" return { "_id": str(doc.get("_id", "")), "batch_time": self.format_time(doc.get("batch_time")), "mix_name": doc.get("mix_name", ""), "video_url": doc.get("video_url", ""), "playcount": doc.get("playcount", ""), "play_vv": doc.get("play_vv", 0), "request_id": doc.get("request_id", ""), "rank": doc.get("rank", 0), "aweme_ids": doc.get("aweme_ids", []), "cover_image_url": doc.get("cover_image_url", ""), "cover_backup_urls": doc.get("cover_backup_urls", []) } def get_video_list(self, page=1, limit=20, sort_by="playcount"): """获取视频列表(分页)""" try: # 计算跳过的数量 skip = (page - 1) * limit # 设置排序字段 if sort_by == "growth": # 按增长排序需要特殊处理 return self.get_growth_videos(page, limit) else: sort_field = "play_vv" if sort_by == "playcount" else "batch_time" sort_order = -1 # 降序 # 获取今天的日期 today = datetime.now().date() # 只查询今天的数据 query_condition = { "batch_time": { "$gte": datetime(today.year, today.month, today.day), "$lt": datetime(today.year, today.month, today.day) + timedelta(days=1) } } # 查询数据并按短剧名称分组,取每个短剧的最新记录 pipeline = [ {"$match": query_condition}, {"$sort": {"batch_time": -1}}, # 按时间倒序 {"$group": { "_id": "$mix_name", # 按短剧名称分组 "latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录) }}, {"$replaceRoot": {"newRoot": "$latest_doc"}}, {"$sort": {sort_field: sort_order}}, {"$skip": skip}, {"$limit": limit} ] docs = list(self.collection.aggregate(pipeline)) # 获取总数 total_pipeline = [ {"$match": query_condition}, {"$sort": {"batch_time": -1}}, {"$group": {"_id": "$mix_name"}}, {"$count": "total"} ] total_result = list(self.collection.aggregate(total_pipeline)) total = total_result[0]["total"] if total_result else 0 # 格式化数据 video_list = [] for doc in docs: item = self.format_video_item(doc) video_list.append(item) return { "success": True, "data": video_list, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": sort_by, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取视频列表失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def get_growth_videos(self, page=1, limit=20, start_date=None, end_date=None): """获取按播放量增长排序的视频列表""" try: # 计算跳过的数量 skip = (page - 1) * limit # 如果没有提供日期,默认使用今天和昨天 if not start_date or not end_date: end_date = datetime.now().date() start_date = end_date - timedelta(days=1) else: # 转换字符串日期为datetime对象 if isinstance(start_date, str): start_date = datetime.strptime(start_date, "%Y-%m-%d").date() if isinstance(end_date, str): end_date = datetime.strptime(end_date, "%Y-%m-%d").date() # 查询结束日期的数据 end_cursor = self.collection.find({ "batch_time": { "$gte": datetime(end_date.year, end_date.month, end_date.day), "$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1) } }) end_data = list(end_cursor) # 查询开始日期的数据 start_cursor = self.collection.find({ "batch_time": { "$gte": datetime(start_date.year, start_date.month, start_date.day), "$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1) } }) start_data = list(start_cursor) # 创建字典以便快速查找 end_dict = {item["mix_name"]: item for item in end_data} start_dict = {item["mix_name"]: item for item in start_data} # 计算增长数据 growth_data = [] for mix_name, end_item in end_dict.items(): if mix_name in start_dict: start_item = start_dict[mix_name] growth = end_item.get("play_vv", 0) - start_item.get("play_vv", 0) # 只保留增长为正的数据 if growth > 0: item = self.format_video_item(end_item) item["growth"] = growth item["start_date"] = start_date.strftime("%Y-%m-%d") item["end_date"] = end_date.strftime("%Y-%m-%d") growth_data.append(item) else: # 如果开始日期没有数据,但结束日期有,也认为是新增长 item = self.format_video_item(end_item) item["growth"] = end_item.get("play_vv", 0) item["start_date"] = start_date.strftime("%Y-%m-%d") item["end_date"] = end_date.strftime("%Y-%m-%d") growth_data.append(item) # 按增长值降序排序 growth_data.sort(key=lambda x: x.get("growth", 0), reverse=True) # 分页处理 total = len(growth_data) paginated_data = growth_data[skip:skip + limit] # 添加排名 for i, item in enumerate(paginated_data): item["rank"] = skip + i + 1 return { "success": True, "data": paginated_data, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": "growth", "date_range": { "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d") }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取增长视频列表失败: {e}") # 如果增长计算失败,返回按播放量排序的数据作为备选 return self.get_video_list(page, limit, "playcount") def get_top_videos(self, limit=10): """获取热门视频(TOP榜单)""" try: # 按播放量排序获取热门视频 cursor = self.collection.find().sort("play_vv", -1).limit(limit) docs = list(cursor) if not docs: return {"success": False, "message": "暂无数据"} # 格式化数据 top_list = [] for doc in docs: item = self.format_video_item(doc) top_list.append(item) return { "success": True, "data": top_list, "total": len(top_list), "update_time": self.format_time(docs[0].get("batch_time")) if docs else "" } except Exception as e: logging.error(f"获取热门视频失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def search_videos(self, keyword, page=1, limit=10): """搜索视频""" try: if not keyword: return {"success": False, "message": "请提供搜索关键词"} # 计算跳过的数量 skip = (page - 1) * limit # 构建搜索条件(模糊匹配合集名称) search_condition = { "mix_name": {"$regex": keyword, "$options": "i"} } # 查询数据 cursor = self.collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit) docs = list(cursor) # 获取搜索结果总数 total = self.collection.count_documents(search_condition) # 格式化数据 search_results = [] for doc in docs: item = self.format_video_item(doc) search_results.append(item) return { "success": True, "data": search_results, "keyword": keyword, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"搜索视频失败: {e}") return {"success": False, "message": f"搜索失败: {str(e)}"} def get_video_detail(self, video_id): """获取视频详情""" try: from bson import ObjectId # 尝试通过ObjectId查找 try: doc = self.collection.find_one({"_id": ObjectId(video_id)}) except: # 如果ObjectId无效,尝试其他字段 doc = self.collection.find_one({ "$or": [ {"mix_name": video_id}, {"request_id": video_id} ] }) if not doc: return {"success": False, "message": "未找到视频信息"} # 格式化详细信息 - 只返回数据库原始字段 detail = self.format_video_item(doc) return { "success": True, "data": detail, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取视频详情失败: {e}") return {"success": False, "message": f"获取详情失败: {str(e)}"} def get_statistics(self): """获取统计信息""" try: # 基本统计 total_videos = self.collection.count_documents({}) if total_videos == 0: return {"success": False, "message": "暂无数据"} # 播放量统计 pipeline = [ { "$group": { "_id": None, "total_playcount": {"$sum": "$play_vv"}, "avg_playcount": {"$avg": "$play_vv"}, "max_playcount": {"$max": "$play_vv"}, "min_playcount": {"$min": "$play_vv"} } } ] stats_result = list(self.collection.aggregate(pipeline)) stats = stats_result[0] if stats_result else {} # 获取最新更新时间 latest_doc = self.collection.find().sort("batch_time", -1).limit(1) latest_time = "" if latest_doc: latest_list = list(latest_doc) if latest_list: latest_time = self.format_time(latest_list[0].get("batch_time")) # 热门分类统计(按播放量区间) categories = [ {"name": "超热门", "min": 100000000, "count": 0}, # 1亿+ {"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿 {"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万 {"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下 ] for category in categories: if "max" in category: count = self.collection.count_documents({ "play_vv": {"$gte": category["min"], "$lte": category["max"]} }) else: count = self.collection.count_documents({ "play_vv": {"$gte": category["min"]} }) category["count"] = count return { "success": True, "data": { "total_videos": total_videos, "total_playcount": stats.get("total_playcount", 0), "avg_playcount": int(stats.get("avg_playcount", 0)), "max_playcount": stats.get("max_playcount", 0), "min_playcount": stats.get("min_playcount", 0), "categories": categories, "latest_update": latest_time }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取统计信息失败: {e}") return {"success": False, "message": f"获取统计失败: {str(e)}"} # 创建API实例 api = MiniprogramAPI() # API路由定义 @app.route('/') def index(): """API首页""" return jsonify({ "name": "小程序抖音播放量数据API", "version": "2.0", "description": "专为小程序优化的抖音播放量数据接口", "endpoints": { "/api/videos": "获取视频列表 (支持分页和排序)", "/api/top": "获取热门视频榜单", "/api/search": "搜索视频", "/api/detail": "获取视频详情", "/api/stats": "获取统计信息", "/api/health": "健康检查" }, "features": [ "分页支持", "多种排序方式", "搜索功能", "详情查看", "统计分析", "小程序优化" ] }) @app.route('/api/videos') def get_videos(): """获取视频列表""" page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 20, type=int) sort_by = request.args.get('sort', 'playcount') # playcount, time, 或 growth start_date = request.args.get('start_date', None) end_date = request.args.get('end_date', None) # 限制参数范围 page = max(1, page) limit = min(50, max(1, limit)) # 限制每页最多50条 if sort_by == "growth": # 增长排序需要特殊处理,支持日期参数 result = api.get_growth_videos(page, limit, start_date, end_date) else: result = api.get_video_list(page, limit, sort_by) return jsonify(result) @app.route('/api/top') def get_top(): """获取热门视频榜单""" limit = request.args.get('limit', 10, type=int) limit = min(50, max(1, limit)) # 限制最多50条 result = api.get_top_videos(limit) return jsonify(result) @app.route('/api/search') def search(): """搜索视频""" keyword = request.args.get('q', '').strip() page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 10, type=int) # 限制参数范围 page = max(1, page) limit = min(30, max(1, limit)) # 搜索结果限制每页最多30条 result = api.search_videos(keyword, page, limit) return jsonify(result) @app.route('/api/detail') def get_detail(): """获取视频详情""" video_id = request.args.get('id', '').strip() if not video_id: return jsonify({"success": False, "message": "请提供视频ID"}) result = api.get_video_detail(video_id) return jsonify(result) @app.route('/api/stats') def get_stats(): """获取统计信息""" result = api.get_statistics() return jsonify(result) @app.route('/api/health') def health_check(): """健康检查""" try: # 检查MongoDB连接 api.client.admin.command('ping') # 获取基本信息 total_count = api.collection.count_documents({}) return jsonify({ "success": True, "status": "healthy", "mongodb": "connected", "total_records": total_count, "server_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "api_version": "2.0" }) except Exception as e: return jsonify({ "success": False, "status": "unhealthy", "mongodb": "disconnected", "error": str(e), "server_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) if __name__ == '__main__': print("启动小程序专用抖音播放量API服务器...") print("API地址: http://localhost:5001") print("小程序API接口列表:") print(" - GET /api/videos?page=1&limit=20&sort=playcount 获取视频列表(总播放量排序)") print(" - GET /api/videos?page=1&limit=20&sort=growth 获取视频列表(增长排序,默认昨天到今天的差值)") print(" - GET /api/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17 获取视频列表(自定义日期范围增长排序)") print(" - GET /api/top?limit=10 获取热门榜单") print(" - GET /api/search?q=关键词&page=1&limit=10 搜索视频") print(" - GET /api/detail?id=视频ID 获取视频详情") print(" - GET /api/stats 获取统计信息") print(" - GET /api/health 健康检查") print("专为小程序优化:分页、搜索、详情、统计、增长排序、自定义日期范围") app.run(host='0.0.0.0', port=5001, debug=True)