2025-08-28 19:13:54 +08:00

254 lines
7.9 KiB
TypeScript

import Datastore from '@seald-io/nedb';
import logger from '@/lib/logger.js';
import path from 'path';
import fs from 'fs';
type DatastoreInstance = any;
class NeDBManager {
private static instance: NeDBManager;
private databases: Map<string, DatastoreInstance> = new Map();
private isInitialized: boolean = false;
private dataDir: string;
private constructor() {
this.dataDir = path.resolve('./data_nedb');
}
public static getInstance(): NeDBManager {
if (!NeDBManager.instance) {
NeDBManager.instance = new NeDBManager();
}
return NeDBManager.instance;
}
public async initialize(): Promise<void> {
// 如果已经初始化,直接返回
if (this.isInitialized) {
return;
}
try {
// 确保数据目录存在
if (!fs.existsSync(this.dataDir)) {
fs.mkdirSync(this.dataDir, { recursive: true });
}
// 初始化数据库
await this.initDatabase('generation_tasks', {
filename: path.join(this.dataDir, 'generation_tasks.db'),
autoload: true,
timestampData: false
});
await this.initDatabase('generation_results', {
filename: path.join(this.dataDir, 'generation_results.db'),
autoload: true,
timestampData: false
});
// 创建索引
await this.createIndexes();
this.isInitialized = true;
logger.success('NeDB initialized successfully');
} catch (error) {
logger.error('NeDB initialization failed:', error);
throw error;
}
}
private async initDatabase(name: string, options: any): Promise<void> {
return new Promise((resolve, reject) => {
const db = new (Datastore as any)(options);
db.loadDatabase((err) => {
if (err) {
reject(err);
} else {
this.databases.set(name, db);
resolve();
}
});
});
}
private async createIndexes(): Promise<void> {
const tasksDb = this.getDatabase('generation_tasks');
const resultsDb = this.getDatabase('generation_results');
// 为任务表创建索引
await this.ensureIndex(tasksDb, { fieldName: 'task_id', unique: true });
await this.ensureIndex(tasksDb, { fieldName: 'status' });
await this.ensureIndex(tasksDb, { fieldName: 'server_id' });
await this.ensureIndex(tasksDb, { fieldName: 'created_at' });
await this.ensureIndex(tasksDb, { fieldName: 'next_poll_at' });
// 为结果表创建索引
await this.ensureIndex(resultsDb, { fieldName: 'task_id', unique: true });
await this.ensureIndex(resultsDb, { fieldName: 'expires_at' });
await this.ensureIndex(resultsDb, { fieldName: 'created_at' });
logger.info('NeDB indexes created successfully');
}
private async ensureIndex(db: DatastoreInstance, options: any): Promise<void> {
return new Promise((resolve, reject) => {
db.ensureIndex(options, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
public getDatabase(name: string): DatastoreInstance {
const db = this.databases.get(name);
if (!db) {
throw new Error(`Database ${name} not found`);
}
return db;
}
public isNeDBConnected(): boolean {
return this.isInitialized;
}
public async disconnect(): Promise<void> {
// NeDB 不需要显式断开连接,但可以清理资源
this.databases.clear();
this.isInitialized = false;
logger.info('NeDB disconnected');
}
// 数据库操作辅助方法
public async insert(dbName: string, doc: any): Promise<any> {
const db = this.getDatabase(dbName);
return new Promise((resolve, reject) => {
db.insert(doc, (err, newDoc) => {
if (err) {
reject(err);
} else {
resolve(newDoc);
}
});
});
}
public async findOne(dbName: string, query: any): Promise<any> {
const db = this.getDatabase(dbName);
return new Promise((resolve, reject) => {
db.findOne(query, (err, doc) => {
if (err) {
reject(err);
} else {
resolve(doc);
}
});
});
}
public async find(dbName: string, query: any, sort?: any, limit?: number): Promise<any[]> {
const db = this.getDatabase(dbName);
return new Promise((resolve, reject) => {
let cursor = db.find(query);
if (sort) {
cursor = cursor.sort(sort);
}
if (limit) {
cursor = cursor.limit(limit);
}
cursor.exec((err, docs) => {
if (err) {
reject(err);
} else {
resolve(docs);
}
});
});
}
public async update(dbName: string, query: any, update: any, options: any = {}): Promise<number> {
const db = this.getDatabase(dbName);
return new Promise((resolve, reject) => {
db.update(query, update, options, (err, numReplaced) => {
if (err) {
reject(err);
} else {
resolve(numReplaced);
}
});
});
}
public async remove(dbName: string, query: any, options: any = {}): Promise<number> {
const db = this.getDatabase(dbName);
return new Promise((resolve, reject) => {
db.remove(query, options, (err, numRemoved) => {
if (err) {
reject(err);
} else {
resolve(numRemoved);
}
});
});
}
public async count(dbName: string, query: any): Promise<number> {
const db = this.getDatabase(dbName);
return new Promise((resolve, reject) => {
db.count(query, (err, count) => {
if (err) {
reject(err);
} else {
resolve(count);
}
});
});
}
// 数据过期清理方法
public async cleanupExpiredData(): Promise<void> {
const currentTime = Math.floor(Date.now() / 1000);
try {
// 清理过期的结果数据
const removedResults = await this.remove('generation_results', {
expires_at: { $lt: currentTime }
}, { multi: true });
// 清理超过24小时的已完成任务
const taskExpireTime = currentTime - (24 * 60 * 60); // 24小时前
const removedTasks = await this.remove('generation_tasks', {
$and: [
{ status: { $in: ['completed', 'failed'] } },
{ updated_at: { $lt: taskExpireTime } }
]
}, { multi: true });
if (removedResults > 0 || removedTasks > 0) {
logger.info(`Cleaned up expired data: ${removedResults} results, ${removedTasks} tasks`);
}
} catch (error) {
logger.error('Failed to cleanup expired data:', error);
}
}
// 启动定期清理
public startPeriodicCleanup(): void {
// 每小时清理一次过期数据
setInterval(() => {
this.cleanupExpiredData();
}, 60 * 60 * 1000); // 1小时
logger.info('Periodic cleanup started for NeDB');
}
}
export default NeDBManager.getInstance();