rank_backend/backend/routers/article_routes.py

268 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文章API服务器
提供文章列表获取和文章详情获取的接口
"""
from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
import logging
from database import db
from bson import ObjectId
# 创建蓝图
article_bp = Blueprint('article', __name__, url_prefix='/api/article')
# 获取数据库集合
articles_collection = db['articles']
def format_time(time_obj):
"""格式化时间"""
if not time_obj:
return ""
if isinstance(time_obj, datetime):
return time_obj.strftime("%Y-%m-%d %H:%M:%S")
else:
return str(time_obj)
def format_article_item(doc):
"""格式化文章数据项"""
return {
"_id": str(doc.get("_id", "")),
"title": doc.get("title", ""),
"author_id": doc.get("author_id", ""),
"cover_image": doc.get("cover_image", ""),
"status": doc.get("status", ""),
"summary": doc.get("summary", ""),
"created_at": format_time(doc.get("created_at")),
"likes": doc.get("likes", []),
"likes_count": len(doc.get("likes", []))
}
def get_article_list(page=1, limit=20, sort_by="created_at", status=None):
"""获取文章列表(分页)"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 构建查询条件
query_condition = {}
if status:
query_condition["status"] = status
# 设置排序字段
sort_field = sort_by if sort_by in ["created_at", "title"] else "created_at"
sort_order = -1 # 降序
# 查询数据
cursor = articles_collection.find(query_condition).sort(sort_field, sort_order).skip(skip).limit(limit)
docs = list(cursor)
# 获取总数
total = articles_collection.count_documents(query_condition)
# 格式化数据
article_list = []
for doc in docs:
item = format_article_item(doc)
article_list.append(item)
return {
"success": True,
"data": article_list,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": sort_by,
"status_filter": status,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取文章列表失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def search_articles(keyword, page=1, limit=10):
"""搜索文章"""
try:
if not keyword:
return {"success": False, "message": "请提供搜索关键词"}
# 计算跳过的数量
skip = (page - 1) * limit
# 构建搜索条件(模糊匹配标题和内容)
search_condition = {
"$or": [
{"title": {"$regex": keyword, "$options": "i"}},
{"content": {"$regex": keyword, "$options": "i"}},
{"summary": {"$regex": keyword, "$options": "i"}}
]
}
# 查询数据
cursor = articles_collection.find(search_condition).sort("created_at", -1).skip(skip).limit(limit)
docs = list(cursor)
# 获取搜索结果总数
total = articles_collection.count_documents(search_condition)
# 格式化数据
search_results = []
for doc in docs:
item = format_article_item(doc)
search_results.append(item)
return {
"success": True,
"data": search_results,
"keyword": keyword,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"搜索文章失败: {e}")
return {"success": False, "message": f"搜索失败: {str(e)}"}
def get_article_detail(article_id):
"""获取文章详情"""
try:
# 尝试通过ObjectId查找
try:
doc = articles_collection.find_one({"_id": ObjectId(article_id)})
except:
# 如果ObjectId无效尝试其他字段
doc = articles_collection.find_one({
"$or": [
{"title": article_id},
{"author_id": article_id}
]
})
if not doc:
return {"success": False, "message": "未找到文章信息"}
# 格式化详细信息
detail = format_article_item(doc)
return {
"success": True,
"data": detail,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取文章详情失败: {e}")
return {"success": False, "message": f"获取详情失败: {str(e)}"}
def get_statistics():
"""获取统计信息"""
try:
# 基本统计
total_articles = articles_collection.count_documents({})
if total_articles == 0:
return {"success": False, "message": "暂无数据"}
# 按状态统计
status_stats = []
for status in ["draft", "published", "archived"]:
count = articles_collection.count_documents({"status": status})
status_stats.append({"status": status, "count": count})
# 获取最新更新时间
latest_doc = articles_collection.find().sort("created_at", -1).limit(1)
latest_time = ""
if latest_doc:
latest_list = list(latest_doc)
if latest_list:
latest_time = format_time(latest_list[0].get("created_at"))
return {
"success": True,
"data": {
"total_articles": total_articles,
"status_stats": status_stats,
"latest_update": latest_time
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取统计信息失败: {e}")
return {"success": False, "message": f"获取统计失败: {str(e)}"}
# 路由定义
@article_bp.route('/list')
def get_articles():
"""获取文章列表"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
sort_by = request.args.get('sort', 'created_at')
status = request.args.get('status')
result = get_article_list(page, limit, sort_by, status)
return jsonify(result)
@article_bp.route('/search')
def search():
"""搜索文章"""
keyword = request.args.get('q', '')
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
result = search_articles(keyword, page, limit)
return jsonify(result)
@article_bp.route('/detail')
def get_detail():
"""获取文章详情"""
article_id = request.args.get('id', '')
result = get_article_detail(article_id)
return jsonify(result)
@article_bp.route('/stats')
def get_stats():
"""获取统计信息"""
result = get_statistics()
return jsonify(result)
@article_bp.route('/health')
def health_check():
"""健康检查"""
try:
# 检查数据库连接
total_records = articles_collection.count_documents({})
return jsonify({
"success": True,
"message": "服务正常",
"data": {
"database": "连接正常",
"total_records": total_records,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
})
except Exception as e:
return jsonify({
"success": False,
"message": f"服务异常: {str(e)}",
"data": {
"database": "连接失败",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
})