// 외부 DB 연결 풀 관리 서비스 // 작성일: 2025-01-13 // 연결 풀 고갈 방지를 위한 중앙 관리 시스템 import { Pool } from "pg"; import mysql from "mysql2/promise"; import { ExternalDbConnection } from "../types/externalDbTypes"; import { ExternalDbConnectionService } from "./externalDbConnectionService"; import { PasswordEncryption } from "../utils/passwordEncryption"; import logger from "../utils/logger"; /** * 연결 풀 래퍼 인터페이스 * 모든 DB 타입의 연결 풀을 통일된 방식으로 관리 */ interface ConnectionPoolWrapper { pool: any; // 실제 연결 풀 객체 dbType: string; connectionId: number; createdAt: Date; lastUsedAt: Date; activeConnections: number; maxConnections: number; // 통일된 쿼리 실행 인터페이스 query(sql: string, params?: any[]): Promise; // 연결 풀 종료 disconnect(): Promise; // 연결 풀 상태 확인 isHealthy(): boolean; } /** * PostgreSQL 연결 풀 래퍼 */ class PostgresPoolWrapper implements ConnectionPoolWrapper { pool: Pool; dbType = "postgresql"; connectionId: number; createdAt: Date; lastUsedAt: Date; activeConnections = 0; maxConnections: number; constructor(config: ExternalDbConnection) { this.connectionId = config.id!; this.createdAt = new Date(); this.lastUsedAt = new Date(); this.maxConnections = config.max_connections || 10; this.pool = new Pool({ host: config.host, port: config.port, database: config.database_name, user: config.username, password: config.password, max: this.maxConnections, min: 2, // 최소 연결 수 idleTimeoutMillis: 30000, // 30초 동안 사용되지 않으면 연결 해제 connectionTimeoutMillis: (config.connection_timeout || 30) * 1000, statement_timeout: (config.query_timeout || 60) * 1000, ssl: config.ssl_enabled === "Y" ? { rejectUnauthorized: false } : false, }); // 연결 풀 이벤트 리스너 this.pool.on("connect", () => { this.activeConnections++; logger.debug( `[PostgreSQL] 새 연결 생성 (${this.activeConnections}/${this.maxConnections})` ); }); this.pool.on("remove", () => { this.activeConnections--; logger.debug( `[PostgreSQL] 연결 제거 (${this.activeConnections}/${this.maxConnections})` ); }); this.pool.on("error", (err) => { logger.error(`[PostgreSQL] 연결 풀 오류:`, err); }); } async query(sql: string, params?: any[]): Promise { this.lastUsedAt = new Date(); const result = await this.pool.query(sql, params); return result.rows; } async disconnect(): Promise { await this.pool.end(); logger.info(`[PostgreSQL] 연결 풀 종료 (ID: ${this.connectionId})`); } isHealthy(): boolean { return ( this.pool.totalCount > 0 && this.activeConnections < this.maxConnections ); } } /** * MySQL/MariaDB 연결 풀 래퍼 */ class MySQLPoolWrapper implements ConnectionPoolWrapper { pool: mysql.Pool; dbType: string; connectionId: number; createdAt: Date; lastUsedAt: Date; activeConnections = 0; maxConnections: number; constructor(config: ExternalDbConnection) { this.connectionId = config.id!; this.dbType = config.db_type; this.createdAt = new Date(); this.lastUsedAt = new Date(); this.maxConnections = config.max_connections || 10; this.pool = mysql.createPool({ host: config.host, port: config.port, database: config.database_name, user: config.username, password: config.password, connectionLimit: this.maxConnections, waitForConnections: true, queueLimit: 0, connectTimeout: (config.connection_timeout || 30) * 1000, ssl: config.ssl_enabled === "Y" ? { rejectUnauthorized: false } : undefined, }); // 연결 획득/해제 이벤트 추적 this.pool.on("acquire", () => { this.activeConnections++; logger.debug( `[${this.dbType.toUpperCase()}] 연결 획득 (${this.activeConnections}/${this.maxConnections})` ); }); this.pool.on("release", () => { this.activeConnections--; logger.debug( `[${this.dbType.toUpperCase()}] 연결 반환 (${this.activeConnections}/${this.maxConnections})` ); }); } async query(sql: string, params?: any[]): Promise { this.lastUsedAt = new Date(); const [rows] = await this.pool.execute(sql, params); return rows; } async disconnect(): Promise { await this.pool.end(); logger.info( `[${this.dbType.toUpperCase()}] 연결 풀 종료 (ID: ${this.connectionId})` ); } isHealthy(): boolean { return this.activeConnections < this.maxConnections; } } /** * 외부 DB 연결 풀 관리자 * 싱글톤 패턴으로 구현하여 전역적으로 연결 풀 관리 */ export class ExternalDbConnectionPoolService { private static instance: ExternalDbConnectionPoolService; private pools: Map = new Map(); private readonly IDLE_TIMEOUT = 10 * 60 * 1000; // 10분 동안 사용되지 않으면 풀 제거 private readonly HEALTH_CHECK_INTERVAL = 60 * 1000; // 1분마다 헬스 체크 private healthCheckTimer?: NodeJS.Timeout; private constructor() { this.startHealthCheck(); logger.info("🔌 외부 DB 연결 풀 서비스 초기화 완료"); } /** * 싱글톤 인스턴스 반환 */ static getInstance(): ExternalDbConnectionPoolService { if (!ExternalDbConnectionPoolService.instance) { ExternalDbConnectionPoolService.instance = new ExternalDbConnectionPoolService(); } return ExternalDbConnectionPoolService.instance; } /** * 연결 풀 가져오기 (없으면 생성) */ async getPool(connectionId: number): Promise { // 기존 풀이 있으면 반환 if (this.pools.has(connectionId)) { const pool = this.pools.get(connectionId)!; pool.lastUsedAt = new Date(); // 헬스 체크 if (!pool.isHealthy()) { logger.warn( `⚠️ 연결 풀 비정상 감지 (ID: ${connectionId}), 재생성 중...` ); await this.removePool(connectionId); return this.createPool(connectionId); } logger.debug(`✅ 기존 연결 풀 재사용 (ID: ${connectionId})`); return pool; } // 새로운 풀 생성 return this.createPool(connectionId); } /** * 새로운 연결 풀 생성 */ private async createPool( connectionId: number ): Promise { logger.info(`🔧 새 연결 풀 생성 중 (ID: ${connectionId})...`); // DB 연결 정보 조회 const connectionResult = await ExternalDbConnectionService.getConnectionById(connectionId); if (!connectionResult.success || !connectionResult.data) { throw new Error(`연결 정보를 찾을 수 없습니다 (ID: ${connectionId})`); } const config = connectionResult.data; // 비활성화된 연결은 사용 불가 if (config.is_active !== "Y") { throw new Error(`비활성화된 연결입니다 (ID: ${connectionId})`); } // 비밀번호 복호화 try { config.password = PasswordEncryption.decrypt(config.password); } catch (error) { logger.error(`비밀번호 복호화 실패 (ID: ${connectionId}):`, error); throw new Error("비밀번호 복호화에 실패했습니다"); } // DB 타입에 따라 적절한 풀 생성 let pool: ConnectionPoolWrapper; switch (config.db_type.toLowerCase()) { case "postgresql": pool = new PostgresPoolWrapper(config); break; case "mysql": case "mariadb": pool = new MySQLPoolWrapper(config); break; case "oracle": case "mssql": // TODO: Oracle과 MSSQL 지원 추가 throw new Error(`${config.db_type}는 아직 지원되지 않습니다`); default: throw new Error(`지원하지 않는 DB 타입: ${config.db_type}`); } this.pools.set(connectionId, pool); logger.info( `✅ 연결 풀 생성 완료 (ID: ${connectionId}, 타입: ${config.db_type}, 최대: ${pool.maxConnections})` ); return pool; } /** * 연결 풀 제거 */ async removePool(connectionId: number): Promise { const pool = this.pools.get(connectionId); if (pool) { await pool.disconnect(); this.pools.delete(connectionId); logger.info(`🗑️ 연결 풀 제거됨 (ID: ${connectionId})`); } } /** * 쿼리 실행 (자동으로 연결 풀 관리) */ async executeQuery( connectionId: number, sql: string, params?: any[] ): Promise { const pool = await this.getPool(connectionId); try { logger.debug( `📊 쿼리 실행 (ID: ${connectionId}): ${sql.substring(0, 100)}...` ); const result = await pool.query(sql, params); logger.debug( `✅ 쿼리 완료 (ID: ${connectionId}), 결과: ${result.length}건` ); return result; } catch (error) { logger.error(`❌ 쿼리 실행 실패 (ID: ${connectionId}):`, error); throw error; } } /** * 연결 테스트 (풀을 생성하지 않고 단순 연결만 테스트) */ async testConnection(connectionId: number): Promise { try { const pool = await this.getPool(connectionId); // 간단한 쿼리로 연결 테스트 const testQuery = pool.dbType === "postgresql" ? "SELECT 1 as test" : "SELECT 1 as test"; await pool.query(testQuery); logger.info(`✅ 연결 테스트 성공 (ID: ${connectionId})`); return true; } catch (error) { logger.error(`❌ 연결 테스트 실패 (ID: ${connectionId}):`, error); return false; } } /** * 주기적인 헬스 체크 및 유휴 풀 정리 */ private startHealthCheck(): void { this.healthCheckTimer = setInterval(() => { const now = Date.now(); this.pools.forEach(async (pool, connectionId) => { const idleTime = now - pool.lastUsedAt.getTime(); // 유휴 시간 초과 시 풀 제거 if (idleTime > this.IDLE_TIMEOUT) { logger.info( `🧹 유휴 연결 풀 정리 (ID: ${connectionId}, 유휴: ${Math.round(idleTime / 1000)}초)` ); await this.removePool(connectionId); } // 헬스 체크 if (!pool.isHealthy()) { logger.warn( `⚠️ 비정상 연결 풀 감지 (ID: ${connectionId}), 재생성 예약` ); await this.removePool(connectionId); } }); // 상태 로깅 if (this.pools.size > 0) { logger.debug( `📊 연결 풀 상태: 총 ${this.pools.size}개, 활성: ${Array.from(this.pools.values()).filter((p) => p.activeConnections > 0).length}개` ); } }, this.HEALTH_CHECK_INTERVAL); logger.info("🔍 헬스 체크 타이머 시작 (간격: 1분)"); } /** * 모든 연결 풀 종료 (애플리케이션 종료 시 호출) */ async closeAll(): Promise { logger.info(`🛑 모든 연결 풀 종료 중... (총 ${this.pools.size}개)`); if (this.healthCheckTimer) { clearInterval(this.healthCheckTimer); } const closePromises = Array.from(this.pools.keys()).map((connectionId) => this.removePool(connectionId) ); await Promise.all(closePromises); logger.info("✅ 모든 연결 풀 종료 완료"); } /** * 현재 연결 풀 상태 조회 */ getPoolsStatus(): Array<{ connectionId: number; dbType: string; activeConnections: number; maxConnections: number; createdAt: Date; lastUsedAt: Date; idleSeconds: number; }> { const now = Date.now(); return Array.from(this.pools.entries()).map(([connectionId, pool]) => ({ connectionId, dbType: pool.dbType, activeConnections: pool.activeConnections, maxConnections: pool.maxConnections, createdAt: pool.createdAt, lastUsedAt: pool.lastUsedAt, idleSeconds: Math.round((now - pool.lastUsedAt.getTime()) / 1000), })); } } // 애플리케이션 종료 시 연결 풀 정리 process.on("SIGINT", async () => { logger.info("🛑 SIGINT 신호 수신, 연결 풀 정리 중..."); await ExternalDbConnectionPoolService.getInstance().closeAll(); process.exit(0); }); process.on("SIGTERM", async () => { logger.info("🛑 SIGTERM 신호 수신, 연결 풀 정리 중..."); await ExternalDbConnectionPoolService.getInstance().closeAll(); process.exit(0); });