#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 文章API服务器 提供文章列表获取和文章详情获取的接口 """ from flask import Blueprint, request, jsonify from datetime import datetime, timedelta import logging from database import db from bson import ObjectId # 创建蓝图 article_bp = Blueprint('article', __name__, url_prefix='/api/article') # 获取数据库集合 articles_collection = db['articles'] def format_time(time_obj): """格式化时间""" if not time_obj: return "" if isinstance(time_obj, datetime): return time_obj.strftime("%Y-%m-%d %H:%M:%S") else: return str(time_obj) def format_article_item(doc): """格式化文章数据项""" return { "_id": str(doc.get("_id", "")), "title": doc.get("title", ""), "author_id": doc.get("author_id", ""), "cover_image": doc.get("cover_image", ""), "status": doc.get("status", ""), "summary": doc.get("summary", ""), "created_at": format_time(doc.get("created_at")), "likes": doc.get("likes", []), "likes_count": len(doc.get("likes", [])) } def get_article_list(page=1, limit=20, sort_by="created_at", status=None): """获取文章列表(分页)""" try: # 计算跳过的数量 skip = (page - 1) * limit # 构建查询条件 query_condition = {} if status: query_condition["status"] = status # 设置排序字段 sort_field = sort_by if sort_by in ["created_at", "title"] else "created_at" sort_order = -1 # 降序 # 查询数据 cursor = articles_collection.find(query_condition).sort(sort_field, sort_order).skip(skip).limit(limit) docs = list(cursor) # 获取总数 total = articles_collection.count_documents(query_condition) # 格式化数据 article_list = [] for doc in docs: item = format_article_item(doc) article_list.append(item) return { "success": True, "data": article_list, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": sort_by, "status_filter": status, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取文章列表失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def search_articles(keyword, page=1, limit=10): """搜索文章""" try: if not keyword: return {"success": False, "message": "请提供搜索关键词"} # 计算跳过的数量 skip = (page - 1) * limit # 构建搜索条件(模糊匹配标题和内容) search_condition = { "$or": [ {"title": {"$regex": keyword, "$options": "i"}}, {"content": {"$regex": keyword, "$options": "i"}}, {"summary": {"$regex": keyword, "$options": "i"}} ] } # 查询数据 cursor = articles_collection.find(search_condition).sort("created_at", -1).skip(skip).limit(limit) docs = list(cursor) # 获取搜索结果总数 total = articles_collection.count_documents(search_condition) # 格式化数据 search_results = [] for doc in docs: item = format_article_item(doc) search_results.append(item) return { "success": True, "data": search_results, "keyword": keyword, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"搜索文章失败: {e}") return {"success": False, "message": f"搜索失败: {str(e)}"} def get_article_detail(article_id): """获取文章详情""" try: # 尝试通过ObjectId查找 try: doc = articles_collection.find_one({"_id": ObjectId(article_id)}) except: # 如果ObjectId无效,尝试其他字段 doc = articles_collection.find_one({ "$or": [ {"title": article_id}, {"author_id": article_id} ] }) if not doc: return {"success": False, "message": "未找到文章信息"} # 格式化详细信息 detail = format_article_item(doc) return { "success": True, "data": detail, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取文章详情失败: {e}") return {"success": False, "message": f"获取详情失败: {str(e)}"} def get_statistics(): """获取统计信息""" try: # 基本统计 total_articles = articles_collection.count_documents({}) if total_articles == 0: return {"success": False, "message": "暂无数据"} # 按状态统计 status_stats = [] for status in ["draft", "published", "archived"]: count = articles_collection.count_documents({"status": status}) status_stats.append({"status": status, "count": count}) # 获取最新更新时间 latest_doc = articles_collection.find().sort("created_at", -1).limit(1) latest_time = "" if latest_doc: latest_list = list(latest_doc) if latest_list: latest_time = format_time(latest_list[0].get("created_at")) return { "success": True, "data": { "total_articles": total_articles, "status_stats": status_stats, "latest_update": latest_time }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取统计信息失败: {e}") return {"success": False, "message": f"获取统计失败: {str(e)}"} # 路由定义 @article_bp.route('/list') def get_articles(): """获取文章列表""" page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 20)) sort_by = request.args.get('sort', 'created_at') status = request.args.get('status') result = get_article_list(page, limit, sort_by, status) return jsonify(result) @article_bp.route('/search') def search(): """搜索文章""" keyword = request.args.get('q', '') page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 10)) result = search_articles(keyword, page, limit) return jsonify(result) @article_bp.route('/detail') def get_detail(): """获取文章详情""" article_id = request.args.get('id', '') result = get_article_detail(article_id) return jsonify(result) @article_bp.route('/stats') def get_stats(): """获取统计信息""" result = get_statistics() return jsonify(result) @article_bp.route('/health') def health_check(): """健康检查""" try: # 检查数据库连接 total_records = articles_collection.count_documents({}) return jsonify({ "success": True, "message": "服务正常", "data": { "database": "连接正常", "total_records": total_records, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } }) except Exception as e: return jsonify({ "success": False, "message": f"服务异常: {str(e)}", "data": { "database": "连接失败", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } })