2025-09-11 18:34:03 +08:00

106 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pymongo import MongoClient
import config
from pymongo.monitoring import (CommandListener, ServerListener, ConnectionPoolListener,
CommandSucceededEvent, CommandFailedEvent,
ServerHeartbeatStartedEvent, ServerHeartbeatSucceededEvent, ServerHeartbeatFailedEvent,
ConnectionCreatedEvent, ConnectionClosedEvent, ConnectionCheckOutStartedEvent,
ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, ConnectionCheckedInEvent)
# 命令事件监听器
class MyCommandListener(CommandListener):
def started(self, event):
# print(f"Command {event.command_name} started on {event.connection_id}")
pass
def succeeded(self, event: CommandSucceededEvent):
# print(f"Command {event.command_name} succeeded in {event.duration_micros} microseconds")
pass
def failed(self, event: CommandFailedEvent):
print(f"Command {event.command_name} failed with error: {event.failure}")
# 服务器事件监听器
class MyServerListener(ServerListener):
def opened(self, event):
print(f"Server {event.server_address} opened")
def description_changed(self, event):
# print(f"Server {event.previous_description.address} description changed to {event.new_description.address}")
pass
def closed(self, event):
print(f"Server {event.server_address} closed")
def heartbeat_started(self, event: ServerHeartbeatStartedEvent):
# print(f"Heartbeat started on server with id: {event.connection_id}")
pass
def heartbeat_succeeded(self, event: ServerHeartbeatSucceededEvent):
# print(f"Heartbeat succeeded on server with id: {event.connection_id}")
pass
def heartbeat_failed(self, event: ServerHeartbeatFailedEvent):
print(f"Heartbeat failed on server with id: {event.connection_id} with error: {event.reply}")
# 连接池事件监听器
class MyPoolListener(ConnectionPoolListener):
def pool_created(self, event):
print(f"Connection pool created for {event.address}")
def pool_cleared(self, event):
print(f"Connection pool cleared for {event.address}")
def pool_closed(self, event):
print(f"Connection pool closed for {event.address}")
def connection_created(self, event: ConnectionCreatedEvent):
print(f"Connection {event.connection_id} created for {event.address}")
def connection_ready(self, event):
print(f"Connection {event.connection_id} ready for {event.address}")
def pool_ready(self, event):
print(f"Connection pool ready for {event.address}")
def connection_closed(self, event: ConnectionClosedEvent):
print(f"Connection {event.connection_id} closed for {event.address}, reason: {event.reason}")
def connection_check_out_started(self, event: ConnectionCheckOutStartedEvent):
# print(f"Connection check out started for {event.address}")
pass
def connection_check_out_failed(self, event: ConnectionCheckOutFailedEvent):
print(f"Connection check out failed for {event.address}, reason: {event.reason}")
def connection_checked_out(self, event: ConnectionCheckedOutEvent):
# print(f"Connection {event.connection_id} checked out for {event.address}")
pass
def connection_checked_in(self, event: ConnectionCheckedInEvent):
# print(f"Connection {event.connection_id} checked in for {event.address}")
pass
# 实例化监听器
command_listener = MyCommandListener()
server_listener = MyServerListener()
pool_listener = MyPoolListener()
all_event_listeners = [command_listener, server_listener, pool_listener]
MONGO_URI = config.MONGO_URI
DB_NAME = config.MONGO_MAIN_DB_NAME
# 创建MongoDB客户端连接
try:
# 实例化MongoClient时传入事件监听器
client = MongoClient(MONGO_URI, event_listeners=all_event_listeners, serverSelectionTimeoutMS=5000) # 设置5秒超时
mainDB = client[DB_NAME]
# 主动检查连接状态
client.admin.command('ping')
success_message = f"\033[92m成功连接到MongoDB: {DB_NAME},事件监听器已激活。\033[0m"
print(success_message)
except Exception as e:
error_message = f"\033[91m数据库连接失败: {MONGO_URI}请检查MongoDB服务是否已启动。\033[0m"
print(error_message)