rank_backend/scripts/miniprogram_api_server.py
2025-10-17 11:28:39 +08:00

600 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
小程序专用抖音播放量数据API服务器
优化的数据格式和接口设计,专为小程序使用
"""
from flask import Flask, jsonify, request
from flask_cors import CORS
from pymongo import MongoClient
from datetime import datetime, timedelta
import logging
import os
import re
# 配置日志
# 确保logs目录存在
import os
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
logs_dir = os.path.join(project_root, 'logs')
os.makedirs(logs_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(logs_dir, 'miniprogram_api.log'), encoding='utf-8'),
logging.StreamHandler()
]
)
app = Flask(__name__)
CORS(app) # 允许跨域访问,支持小程序调用
class MiniprogramAPI:
def __init__(self):
self.client = None
self.db = None
self.collection = None
self.connect_mongodb()
def connect_mongodb(self):
"""连接MongoDB数据库"""
try:
self.client = MongoClient('mongodb://localhost:27017/')
# 测试连接
self.client.admin.command('ping')
# 使用数据库与集合
self.db = self.client['douyin_data']
self.collection = self.db['play_vv_records']
logging.info("MongoDB连接成功")
return True
except Exception as e:
logging.error(f"MongoDB连接失败: {e}")
return False
def format_playcount(self, playcount_str):
"""格式化播放量字符串为数字"""
if not playcount_str:
return 0
try:
if isinstance(playcount_str, (int, float)):
return int(playcount_str)
playcount_str = str(playcount_str).strip()
# 处理亿、万等单位
if "亿" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 100000000)
elif "" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 10000)
else:
# 尝试直接转换数字
return int(float(playcount_str))
except:
return 0
def format_cover_url(self, cover_data):
"""格式化封面图片URL"""
if not cover_data:
return ""
if isinstance(cover_data, str):
return cover_data
elif isinstance(cover_data, dict) and 'url_list' in cover_data:
return cover_data['url_list'][0] if cover_data['url_list'] else ""
else:
return ""
def format_time(self, time_obj):
"""格式化时间"""
if not time_obj:
return ""
if isinstance(time_obj, datetime):
return time_obj.strftime("%Y-%m-%d %H:%M:%S")
else:
return str(time_obj)
def format_video_item(self, doc):
"""格式化单个视频数据项 - 完全按照数据库原始字段返回"""
return {
"_id": str(doc.get("_id", "")),
"batch_time": self.format_time(doc.get("batch_time")),
"mix_name": doc.get("mix_name", ""),
"video_url": doc.get("video_url", ""),
"playcount": doc.get("playcount", ""),
"play_vv": doc.get("play_vv", 0),
"request_id": doc.get("request_id", ""),
"rank": doc.get("rank", 0),
"aweme_ids": doc.get("aweme_ids", []),
"cover_image_url": doc.get("cover_image_url", ""),
"cover_backup_urls": doc.get("cover_backup_urls", [])
}
def get_video_list(self, page=1, limit=20, sort_by="playcount"):
"""获取视频列表(分页)"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 设置排序字段
if sort_by == "growth":
# 按增长排序需要特殊处理
return self.get_growth_videos(page, limit)
else:
sort_field = "play_vv" if sort_by == "playcount" else "batch_time"
sort_order = -1 # 降序
# 获取今天的日期
today = datetime.now().date()
# 只查询今天的数据
query_condition = {
"batch_time": {
"$gte": datetime(today.year, today.month, today.day),
"$lt": datetime(today.year, today.month, today.day) + timedelta(days=1)
}
}
# 查询数据并按短剧名称分组,取每个短剧的最新记录
pipeline = [
{"$match": query_condition},
{"$sort": {"batch_time": -1}}, # 按时间倒序
{"$group": {
"_id": "$mix_name", # 按短剧名称分组
"latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录)
}},
{"$replaceRoot": {"newRoot": "$latest_doc"}},
{"$sort": {sort_field: sort_order}},
{"$skip": skip},
{"$limit": limit}
]
docs = list(self.collection.aggregate(pipeline))
# 获取总数
total_pipeline = [
{"$match": query_condition},
{"$sort": {"batch_time": -1}},
{"$group": {"_id": "$mix_name"}},
{"$count": "total"}
]
total_result = list(self.collection.aggregate(total_pipeline))
total = total_result[0]["total"] if total_result else 0
# 格式化数据
video_list = []
for doc in docs:
item = self.format_video_item(doc)
video_list.append(item)
return {
"success": True,
"data": video_list,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": sort_by,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取视频列表失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def get_growth_videos(self, page=1, limit=20, start_date=None, end_date=None):
"""获取按播放量增长排序的视频列表"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 如果没有提供日期,默认使用今天和昨天
if not start_date or not end_date:
end_date = datetime.now().date()
start_date = end_date - timedelta(days=1)
else:
# 转换字符串日期为datetime对象
if isinstance(start_date, str):
start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
if isinstance(end_date, str):
end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
# 查询结束日期的数据
end_cursor = self.collection.find({
"batch_time": {
"$gte": datetime(end_date.year, end_date.month, end_date.day),
"$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1)
}
})
end_data = list(end_cursor)
# 查询开始日期的数据
start_cursor = self.collection.find({
"batch_time": {
"$gte": datetime(start_date.year, start_date.month, start_date.day),
"$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1)
}
})
start_data = list(start_cursor)
# 创建字典以便快速查找
end_dict = {item["mix_name"]: item for item in end_data}
start_dict = {item["mix_name"]: item for item in start_data}
# 计算增长数据
growth_data = []
for mix_name, end_item in end_dict.items():
if mix_name in start_dict:
start_item = start_dict[mix_name]
growth = end_item.get("play_vv", 0) - start_item.get("play_vv", 0)
# 只保留增长为正的数据
if growth > 0:
item = self.format_video_item(end_item)
item["growth"] = growth
item["start_date"] = start_date.strftime("%Y-%m-%d")
item["end_date"] = end_date.strftime("%Y-%m-%d")
growth_data.append(item)
else:
# 如果开始日期没有数据,但结束日期有,也认为是新增长
item = self.format_video_item(end_item)
item["growth"] = end_item.get("play_vv", 0)
item["start_date"] = start_date.strftime("%Y-%m-%d")
item["end_date"] = end_date.strftime("%Y-%m-%d")
growth_data.append(item)
# 按增长值降序排序
growth_data.sort(key=lambda x: x.get("growth", 0), reverse=True)
# 分页处理
total = len(growth_data)
paginated_data = growth_data[skip:skip + limit]
# 添加排名
for i, item in enumerate(paginated_data):
item["rank"] = skip + i + 1
return {
"success": True,
"data": paginated_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": "growth",
"date_range": {
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d")
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取增长视频列表失败: {e}")
# 如果增长计算失败,返回按播放量排序的数据作为备选
return self.get_video_list(page, limit, "playcount")
def get_top_videos(self, limit=10):
"""获取热门视频TOP榜单"""
try:
# 按播放量排序获取热门视频
cursor = self.collection.find().sort("play_vv", -1).limit(limit)
docs = list(cursor)
if not docs:
return {"success": False, "message": "暂无数据"}
# 格式化数据
top_list = []
for doc in docs:
item = self.format_video_item(doc)
top_list.append(item)
return {
"success": True,
"data": top_list,
"total": len(top_list),
"update_time": self.format_time(docs[0].get("batch_time")) if docs else ""
}
except Exception as e:
logging.error(f"获取热门视频失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def search_videos(self, keyword, page=1, limit=10):
"""搜索视频"""
try:
if not keyword:
return {"success": False, "message": "请提供搜索关键词"}
# 计算跳过的数量
skip = (page - 1) * limit
# 构建搜索条件(模糊匹配合集名称)
search_condition = {
"mix_name": {"$regex": keyword, "$options": "i"}
}
# 查询数据
cursor = self.collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit)
docs = list(cursor)
# 获取搜索结果总数
total = self.collection.count_documents(search_condition)
# 格式化数据
search_results = []
for doc in docs:
item = self.format_video_item(doc)
search_results.append(item)
return {
"success": True,
"data": search_results,
"keyword": keyword,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"搜索视频失败: {e}")
return {"success": False, "message": f"搜索失败: {str(e)}"}
def get_video_detail(self, video_id):
"""获取视频详情"""
try:
from bson import ObjectId
# 尝试通过ObjectId查找
try:
doc = self.collection.find_one({"_id": ObjectId(video_id)})
except:
# 如果ObjectId无效尝试其他字段
doc = self.collection.find_one({
"$or": [
{"mix_name": video_id},
{"request_id": video_id}
]
})
if not doc:
return {"success": False, "message": "未找到视频信息"}
# 格式化详细信息 - 只返回数据库原始字段
detail = self.format_video_item(doc)
return {
"success": True,
"data": detail,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取视频详情失败: {e}")
return {"success": False, "message": f"获取详情失败: {str(e)}"}
def get_statistics(self):
"""获取统计信息"""
try:
# 基本统计
total_videos = self.collection.count_documents({})
if total_videos == 0:
return {"success": False, "message": "暂无数据"}
# 播放量统计
pipeline = [
{
"$group": {
"_id": None,
"total_playcount": {"$sum": "$play_vv"},
"avg_playcount": {"$avg": "$play_vv"},
"max_playcount": {"$max": "$play_vv"},
"min_playcount": {"$min": "$play_vv"}
}
}
]
stats_result = list(self.collection.aggregate(pipeline))
stats = stats_result[0] if stats_result else {}
# 获取最新更新时间
latest_doc = self.collection.find().sort("batch_time", -1).limit(1)
latest_time = ""
if latest_doc:
latest_list = list(latest_doc)
if latest_list:
latest_time = self.format_time(latest_list[0].get("batch_time"))
# 热门分类统计(按播放量区间)
categories = [
{"name": "超热门", "min": 100000000, "count": 0}, # 1亿+
{"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿
{"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万
{"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下
]
for category in categories:
if "max" in category:
count = self.collection.count_documents({
"play_vv": {"$gte": category["min"], "$lte": category["max"]}
})
else:
count = self.collection.count_documents({
"play_vv": {"$gte": category["min"]}
})
category["count"] = count
return {
"success": True,
"data": {
"total_videos": total_videos,
"total_playcount": stats.get("total_playcount", 0),
"avg_playcount": int(stats.get("avg_playcount", 0)),
"max_playcount": stats.get("max_playcount", 0),
"min_playcount": stats.get("min_playcount", 0),
"categories": categories,
"latest_update": latest_time
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取统计信息失败: {e}")
return {"success": False, "message": f"获取统计失败: {str(e)}"}
# 创建API实例
api = MiniprogramAPI()
# API路由定义
@app.route('/')
def index():
"""API首页"""
return jsonify({
"name": "小程序抖音播放量数据API",
"version": "2.0",
"description": "专为小程序优化的抖音播放量数据接口",
"endpoints": {
"/api/videos": "获取视频列表 (支持分页和排序)",
"/api/top": "获取热门视频榜单",
"/api/search": "搜索视频",
"/api/detail": "获取视频详情",
"/api/stats": "获取统计信息",
"/api/health": "健康检查"
},
"features": [
"分页支持",
"多种排序方式",
"搜索功能",
"详情查看",
"统计分析",
"小程序优化"
]
})
@app.route('/api/videos')
def get_videos():
"""获取视频列表"""
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 20, type=int)
sort_by = request.args.get('sort', 'playcount') # playcount, time, 或 growth
start_date = request.args.get('start_date', None)
end_date = request.args.get('end_date', None)
# 限制参数范围
page = max(1, page)
limit = min(50, max(1, limit)) # 限制每页最多50条
if sort_by == "growth":
# 增长排序需要特殊处理,支持日期参数
result = api.get_growth_videos(page, limit, start_date, end_date)
else:
result = api.get_video_list(page, limit, sort_by)
return jsonify(result)
@app.route('/api/top')
def get_top():
"""获取热门视频榜单"""
limit = request.args.get('limit', 10, type=int)
limit = min(50, max(1, limit)) # 限制最多50条
result = api.get_top_videos(limit)
return jsonify(result)
@app.route('/api/search')
def search():
"""搜索视频"""
keyword = request.args.get('q', '').strip()
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
# 限制参数范围
page = max(1, page)
limit = min(30, max(1, limit)) # 搜索结果限制每页最多30条
result = api.search_videos(keyword, page, limit)
return jsonify(result)
@app.route('/api/detail')
def get_detail():
"""获取视频详情"""
video_id = request.args.get('id', '').strip()
if not video_id:
return jsonify({"success": False, "message": "请提供视频ID"})
result = api.get_video_detail(video_id)
return jsonify(result)
@app.route('/api/stats')
def get_stats():
"""获取统计信息"""
result = api.get_statistics()
return jsonify(result)
@app.route('/api/health')
def health_check():
"""健康检查"""
try:
# 检查MongoDB连接
api.client.admin.command('ping')
# 获取基本信息
total_count = api.collection.count_documents({})
return jsonify({
"success": True,
"status": "healthy",
"mongodb": "connected",
"total_records": total_count,
"server_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"api_version": "2.0"
})
except Exception as e:
return jsonify({
"success": False,
"status": "unhealthy",
"mongodb": "disconnected",
"error": str(e),
"server_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})
if __name__ == '__main__':
print("启动小程序专用抖音播放量API服务器...")
print("API地址: http://localhost:5001")
print("小程序API接口列表:")
print(" - GET /api/videos?page=1&limit=20&sort=playcount 获取视频列表(总播放量排序)")
print(" - GET /api/videos?page=1&limit=20&sort=growth 获取视频列表(增长排序,默认昨天到今天的差值)")
print(" - GET /api/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17 获取视频列表(自定义日期范围增长排序)")
print(" - GET /api/top?limit=10 获取热门榜单")
print(" - GET /api/search?q=关键词&page=1&limit=10 搜索视频")
print(" - GET /api/detail?id=视频ID 获取视频详情")
print(" - GET /api/stats 获取统计信息")
print(" - GET /api/health 健康检查")
print("专为小程序优化:分页、搜索、详情、统计、增长排序、自定义日期范围")
app.run(host='0.0.0.0', port=5001, debug=True)