#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MongoDB数据库快速查看工具 一次性显示数据库结构、统计信息和最新数据 """ import pymongo from pymongo import MongoClient from datetime import datetime import json from collections import defaultdict def connect_mongodb(connection_string='mongodb://localhost:27017/'): """连接到MongoDB""" try: client = MongoClient(connection_string, serverSelectionTimeoutMS=5000) client.admin.command('ping') print(f"✅ 成功连接到MongoDB: {connection_string}") return client except Exception as e: print(f"❌ 连接MongoDB失败: {e}") return None def analyze_document_schema(document): """分析文档结构""" if not document: return {} schema = {} for key, value in document.items(): if key == '_id': schema[key] = {'type': 'ObjectId', 'example': str(value)} elif isinstance(value, str): schema[key] = {'type': 'string', 'example': value[:50] + '...' if len(value) > 50 else value} elif isinstance(value, int): schema[key] = {'type': 'integer', 'example': value} elif isinstance(value, float): schema[key] = {'type': 'float', 'example': value} elif isinstance(value, bool): schema[key] = {'type': 'boolean', 'example': value} elif isinstance(value, datetime): schema[key] = {'type': 'datetime', 'example': value.strftime('%Y-%m-%d %H:%M:%S')} elif isinstance(value, list): schema[key] = { 'type': 'array', 'length': len(value), 'example': value[:3] if len(value) <= 3 else value[:3] + ['...'] } elif isinstance(value, dict): schema[key] = { 'type': 'object', 'keys': list(value.keys())[:5], 'example': {k: v for k, v in list(value.items())[:2]} } else: schema[key] = {'type': type(value).__name__, 'example': str(value)[:50]} return schema def display_database_info(client): """显示数据库信息""" print("\n" + "="*80) print("📊 MongoDB 数据库结构分析") print("="*80) try: db_names = client.list_database_names() for db_name in db_names: if db_name in ['admin', 'local', 'config']: continue db = client[db_name] collections = db.list_collection_names() print(f"\n🗄️ 数据库: {db_name}") print(f" 集合数量: {len(collections)}") for coll_name in collections: collection = db[coll_name] count = collection.count_documents({}) print(f"\n 📁 集合: {coll_name}") print(f" 文档数量: {count:,}") if count > 0: # 获取样本文档来分析结构 sample_doc = collection.find_one() schema = analyze_document_schema(sample_doc) if schema: print(f" 📋 字段结构:") for field_name, field_info in schema.items(): print(f" • {field_name}: {field_info['type']}") if 'example' in field_info: example = field_info['example'] if isinstance(example, str) and len(example) > 100: example = example[:100] + "..." print(f" 示例: {example}") else: print(f" ⚠️ 集合为空") except Exception as e: print(f"❌ 获取数据库信息失败: {e}") def display_statistics(client, db_name='douyin_data', collection_name='play_vv_records'): """显示统计信息""" try: db = client[db_name] collection = db[collection_name] print(f"\n📊 统计信息 ({db_name}.{collection_name})") print("-" * 50) # 基本统计 total_count = collection.count_documents({}) print(f"📈 总文档数: {total_count:,}") if total_count == 0: print("⚠️ 集合为空,无法显示统计信息") return # 时间范围统计 time_fields = ['batch_time', 'created_at', 'timestamp'] for field in time_fields: if collection.find_one({field: {'$exists': True}}): pipeline = [ {'$group': { '_id': None, 'min_time': {'$min': f'${field}'}, 'max_time': {'$max': f'${field}'} }} ] result = list(collection.aggregate(pipeline)) if result: min_time = result[0]['min_time'] max_time = result[0]['max_time'] print(f"📅 时间范围 ({field}):") print(f" 最早: {min_time.strftime('%Y-%m-%d %H:%M:%S')}") print(f" 最新: {max_time.strftime('%Y-%m-%d %H:%M:%S')}") break # 播放量统计 playcount_fields = ['play_vv', 'playcount', 'play_count', 'views'] for field in playcount_fields: if collection.find_one({field: {'$exists': True, '$type': 'number'}}): pipeline = [ {'$group': { '_id': None, 'total_plays': {'$sum': f'${field}'}, 'avg_plays': {'$avg': f'${field}'}, 'max_plays': {'$max': f'${field}'}, 'min_plays': {'$min': f'${field}'} }} ] result = list(collection.aggregate(pipeline)) if result: stats = result[0] print(f"🎬 播放量统计 ({field}):") print(f" 总播放量: {stats['total_plays']:,}") print(f" 平均播放量: {stats['avg_plays']:,.0f}") print(f" 最高播放量: {stats['max_plays']:,}") print(f" 最低播放量: {stats['min_plays']:,}") break # 热门内容统计 if collection.find_one({'mix_name': {'$exists': True}}): print(f"\n🔥 热门内容 (按播放量排序):") pipeline = [ {'$match': {'play_vv': {'$exists': True, '$type': 'number'}}}, {'$sort': {'play_vv': -1}}, {'$limit': 5}, {'$project': {'mix_name': 1, 'play_vv': 1, 'batch_time': 1}} ] top_content = list(collection.aggregate(pipeline)) for i, content in enumerate(top_content, 1): name = content.get('mix_name', '未知') plays = content.get('play_vv', 0) time_str = content.get('batch_time', datetime.now()).strftime('%m-%d %H:%M') print(f" {i}. {name}: {plays:,} ({time_str})") except Exception as e: print(f"❌ 获取统计信息失败: {e}") def display_recent_data(client, db_name='douyin_data', collection_name='play_vv_records', limit=3): """显示最近的数据""" try: db = client[db_name] collection = db[collection_name] print(f"\n📈 最近 {limit} 条数据 ({db_name}.{collection_name})") print("-" * 80) # 尝试按时间字段排序 time_fields = ['batch_time', 'created_at', 'timestamp', '_id'] sort_field = None for field in time_fields: if collection.find_one({field: {'$exists': True}}): sort_field = field break if sort_field: recent_docs = list(collection.find().sort(sort_field, -1).limit(limit)) else: recent_docs = list(collection.find().limit(limit)) if not recent_docs: print("⚠️ 没有找到数据") return for i, doc in enumerate(recent_docs, 1): print(f"\n📄 记录 {i}:") display_document(doc) except Exception as e: print(f"❌ 获取最近数据失败: {e}") def display_document(doc, indent=2): """显示单个文档""" spaces = " " * indent for key, value in doc.items(): if key == '_id': print(f"{spaces}🆔 {key}: {value}") elif isinstance(value, datetime): print(f"{spaces}📅 {key}: {value.strftime('%Y-%m-%d %H:%M:%S')}") elif isinstance(value, str): display_value = value[:100] + "..." if len(value) > 100 else value print(f"{spaces}📝 {key}: {display_value}") elif isinstance(value, (int, float)): if key in ['playcount', 'play_count', 'views', 'play_vv']: print(f"{spaces}📊 {key}: {value:,}") else: print(f"{spaces}🔢 {key}: {value}") elif isinstance(value, list): print(f"{spaces}📋 {key}: [{len(value)} 项]") if len(value) > 0 and len(value) <= 3: for item in value[:3]: item_str = str(item)[:50] + "..." if len(str(item)) > 50 else str(item) print(f"{spaces} - {item_str}") elif len(value) > 3: for item in value[:2]: item_str = str(item)[:50] + "..." if len(str(item)) > 50 else str(item) print(f"{spaces} - {item_str}") print(f"{spaces} ... 还有 {len(value)-2} 项") elif isinstance(value, dict): print(f"{spaces}📦 {key}: {{对象}}") if len(value) <= 3: for k, v in value.items(): v_str = str(v)[:50] + "..." if len(str(v)) > 50 else str(v) print(f"{spaces} {k}: {v_str}") else: for k, v in list(value.items())[:2]: v_str = str(v)[:50] + "..." if len(str(v)) > 50 else str(v) print(f"{spaces} {k}: {v_str}") print(f"{spaces} ... 还有 {len(value)-2} 个字段") else: print(f"{spaces}❓ {key}: {value}") def main(): """主函数""" print("🚀 MongoDB 数据库快速查看工具") print(f"⏰ 查看时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 连接数据库 client = connect_mongodb() if not client: return try: # 显示数据库结构 display_database_info(client) # 显示统计信息 display_statistics(client) # 显示最近数据 display_recent_data(client) print(f"\n{'='*80}") print("✅ 数据库查看完成!") print("💡 提示: 运行 'python scripts/mongodb_viewer.py' 可以使用交互式查看器") print("🔄 提示: 重新运行此脚本可以查看最新数据") except KeyboardInterrupt: print("\n👋 程序被用户中断") finally: if client: client.close() if __name__ == '__main__': main()