jimeng-free-api/src/lib/services/HeartbeatService.ts
2025-08-27 17:11:05 +08:00

274 lines
8.9 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import * as cron from 'node-cron';
import * as os from 'os';
import { v4 as uuidv4 } from 'uuid';
import JimengServer, { IJimengServer } from '@/lib/database/models/ServiceHeartbeat.ts';
import mongoDBManager from '@/lib/database/mongodb.ts';
import logger from '@/lib/logger.ts';
import config from '@/lib/config.ts';
import environment from '@/lib/environment.ts';
export class HeartbeatService {
private static instance: HeartbeatService;
private serverId: string;
private serverName: string;
private baseUrl: string;
private heartbeatTask: cron.ScheduledTask | null = null;
private isRunning: boolean = false;
private constructor() {
// 优先从环境变量获取配置,否则使用默认值
this.serverId = process.env.SERVICE_ID || config.service?.name || 'jimeng-free-api';
this.serverName = process.env.SERVICE_NAME || this.serverId;
this.baseUrl = this.buildBaseUrl();
}
public static getInstance(): HeartbeatService {
if (!HeartbeatService.instance) {
HeartbeatService.instance = new HeartbeatService();
}
return HeartbeatService.instance;
}
private buildBaseUrl(): string {
const host = process.env.HOST || config.service?.host || '0.0.0.0';
const port = process.env.PORT || config.service?.port || 3302;
// 如果明确指定了 BASE_URL直接使用
if (process.env.BASE_URL) {
return process.env.BASE_URL;
}
// 如果是 Docker 环境,使用容器名称
if (process.env.NODE_ENV === 'production' && process.env.CONTAINER_NAME) {
return `http://${process.env.CONTAINER_NAME}:${port}`;
}
// PM2 环境或Node.js直接启动环境
let targetHost = host;
if (host === '0.0.0.0') {
// 尝试获取本机的实际IP地址
targetHost = this.getLocalIP() || 'localhost';
}
return `http://${targetHost}:${port}`;
}
private getLocalIP(): string | null {
try {
const interfaces = os.networkInterfaces();
// 优先查找非回环的IPv4地址
for (const name of Object.keys(interfaces)) {
for (const iface of interfaces[name]) {
if (iface.family === 'IPv4' && !iface.internal) {
return iface.address;
}
}
}
} catch (error) {
logger.warn('Failed to get local IP:', error.message);
}
return null;
}
public async start(): Promise<void> {
if (this.isRunning) {
logger.warn('Heartbeat service is already running');
return;
}
try {
// 确保MongoDB连接可用
if (!mongoDBManager.isMongoConnected()) {
logger.warn('MongoDB not connected, skipping heartbeat service');
return;
}
// 初始化或更新服务器信息
await this.registerServer();
// 立即发送第一次心跳
await this.sendHeartbeat();
// 设置定时任务 - 每60秒发送一次心跳适配 Python 项目的默认间隔)
const heartbeatInterval = process.env.HEARTBEAT_INTERVAL || config.heartbeat?.interval || 60;
const cronExpression = `*/${heartbeatInterval} * * * * *`; // 每 N 秒
this.heartbeatTask = cron.schedule(cronExpression, async () => {
try {
await this.sendHeartbeat();
} catch (error) {
logger.error('Heartbeat failed:', error);
}
}, {
scheduled: false
});
this.heartbeatTask.start();
this.isRunning = true;
logger.success(`Heartbeat service started for server: ${this.serverId}`);
// 监听进程退出事件
process.on('SIGINT', () => this.gracefulShutdown());
process.on('SIGTERM', () => this.gracefulShutdown());
} catch (error) {
logger.error('Failed to start heartbeat service:', error);
throw error;
}
}
public async stop(): Promise<void> {
if (!this.isRunning) {
return;
}
if (this.heartbeatTask) {
this.heartbeatTask.stop();
this.heartbeatTask = null;
}
// 标记服务为非活跃状态
await this.markInactive();
this.isRunning = false;
logger.info('Heartbeat service stopped');
}
private async registerServer(): Promise<void> {
try {
const currentTime = Math.floor(Date.now() / 1000);
const serverData = {
server_id: this.serverId,
server_name: this.serverName,
base_url: this.baseUrl,
is_active: true,
last_heartbeat: currentTime,
heartbeat_interval: parseInt(process.env.HEARTBEAT_INTERVAL || '60'),
updated_at: currentTime
};
await JimengServer.findOneAndUpdate(
{ server_id: this.serverId },
{
...serverData,
$setOnInsert: { created_at: currentTime }
},
{ upsert: true, new: true }
);
logger.info(`Server registered: ${this.serverId} at ${this.baseUrl}`);
} catch (error) {
logger.error('Failed to register server:', error);
throw error;
}
}
private async sendHeartbeat(): Promise<void> {
try {
const currentTime = Math.floor(Date.now() / 1000);
await JimengServer.findOneAndUpdate(
{ server_id: this.serverId },
{
last_heartbeat: currentTime,
updated_at: currentTime,
is_active: true
}
);
logger.debug(`Heartbeat sent for server ${this.serverId}`);
} catch (error) {
logger.error('Failed to send heartbeat:', error);
throw error;
}
}
private async markInactive(): Promise<void> {
try {
const currentTime = Math.floor(Date.now() / 1000);
await JimengServer.findOneAndUpdate(
{ server_id: this.serverId },
{
is_active: false,
updated_at: currentTime
}
);
logger.info(`Server ${this.serverId} marked as inactive`);
} catch (error) {
logger.error('Failed to mark server as inactive:', error);
}
}
private async gracefulShutdown(): Promise<void> {
logger.info('Received shutdown signal, stopping heartbeat service...');
await this.stop();
process.exit(0);
}
public getServerInfo() {
return {
serverId: this.serverId,
serverName: this.serverName,
baseUrl: this.baseUrl,
isRunning: this.isRunning
};
}
// 获取所有活跃服务器
public static async getActiveServers(): Promise<IJimengServer[]> {
try {
return await JimengServer.find({
is_active: true
}).sort({ last_heartbeat: -1 });
} catch (error) {
logger.error('Failed to get active servers:', error);
return [];
}
}
// 获取在线服务器(基于心跳超时检查)
public static async getOnlineServers(): Promise<IJimengServer[]> {
try {
const currentTime = Math.floor(Date.now() / 1000);
const timeoutFactor = 1.5; // 超时倍数
const servers = await JimengServer.find({ is_active: true });
return servers.filter(server => {
const heartbeatTimeout = server.heartbeat_interval * timeoutFactor;
return (currentTime - server.last_heartbeat) <= heartbeatTimeout;
});
} catch (error) {
logger.error('Failed to get online servers:', error);
return [];
}
}
// 清理离线服务器记录
public static async cleanupOfflineServers(): Promise<void> {
try {
const currentTime = Math.floor(Date.now() / 1000);
const cleanupTimeout = 24 * 60 * 60; // 24小时
await JimengServer.deleteMany({
$or: [
{ is_active: false },
{ last_heartbeat: { $lt: currentTime - cleanupTimeout } }
]
});
logger.debug('Cleaned up offline servers');
} catch (error) {
logger.error('Failed to cleanup offline servers:', error);
}
}
}
export default HeartbeatService.getInstance();