ERP-node/backend-node/src/services/externalDbConnectionPoolSer...

494 lines
14 KiB
TypeScript

// 외부 DB 연결 풀 관리 서비스
// 작성일: 2025-01-13
// 연결 풀 고갈 방지를 위한 중앙 관리 시스템
import { Pool } from "pg";
import mysql from "mysql2/promise";
import { ExternalDbConnection } from "../types/externalDbTypes";
import { ExternalDbConnectionService } from "./externalDbConnectionService";
import { PasswordEncryption } from "../utils/passwordEncryption";
import logger from "../utils/logger";
/**
* 연결 풀 래퍼 인터페이스
* 모든 DB 타입의 연결 풀을 통일된 방식으로 관리
*/
interface ConnectionPoolWrapper {
pool: any; // 실제 연결 풀 객체
dbType: string;
connectionId: number;
createdAt: Date;
lastUsedAt: Date;
activeConnections: number;
maxConnections: number;
// 통일된 쿼리 실행 인터페이스
query(sql: string, params?: any[]): Promise<any>;
// 연결 풀 종료
disconnect(): Promise<void>;
// 연결 풀 상태 확인
isHealthy(): boolean;
}
/**
* PostgreSQL 연결 풀 래퍼
*/
class PostgresPoolWrapper implements ConnectionPoolWrapper {
pool: Pool;
dbType = "postgresql";
connectionId: number;
createdAt: Date;
lastUsedAt: Date;
activeConnections = 0;
maxConnections: number;
constructor(config: ExternalDbConnection) {
this.connectionId = config.id!;
this.createdAt = new Date();
this.lastUsedAt = new Date();
this.maxConnections = config.max_connections || 10;
this.pool = new Pool({
host: config.host,
port: config.port,
database: config.database_name,
user: config.username,
password: config.password,
max: this.maxConnections,
min: 2, // 최소 연결 수
idleTimeoutMillis: 30000, // 30초 동안 사용되지 않으면 연결 해제
connectionTimeoutMillis: (config.connection_timeout || 30) * 1000,
statement_timeout: (config.query_timeout || 60) * 1000,
ssl: config.ssl_enabled === "Y" ? { rejectUnauthorized: false } : false,
});
// 연결 풀 이벤트 리스너
this.pool.on("connect", () => {
this.activeConnections++;
logger.debug(
`[PostgreSQL] 새 연결 생성 (${this.activeConnections}/${this.maxConnections})`
);
});
this.pool.on("remove", () => {
this.activeConnections--;
logger.debug(
`[PostgreSQL] 연결 제거 (${this.activeConnections}/${this.maxConnections})`
);
});
this.pool.on("error", (err) => {
logger.error(`[PostgreSQL] 연결 풀 오류:`, err);
});
}
async query(sql: string, params?: any[]): Promise<any> {
this.lastUsedAt = new Date();
const result = await this.pool.query(sql, params);
return result.rows;
}
async disconnect(): Promise<void> {
await this.pool.end();
logger.info(`[PostgreSQL] 연결 풀 종료 (ID: ${this.connectionId})`);
}
isHealthy(): boolean {
return (
this.pool.totalCount > 0 && this.activeConnections < this.maxConnections
);
}
}
/**
* MySQL/MariaDB 연결 풀 래퍼
*/
class MySQLPoolWrapper implements ConnectionPoolWrapper {
pool: mysql.Pool;
dbType: string;
connectionId: number;
createdAt: Date;
lastUsedAt: Date;
activeConnections = 0;
maxConnections: number;
private isPoolClosed = false;
constructor(config: ExternalDbConnection) {
this.connectionId = config.id!;
this.dbType = config.db_type;
this.createdAt = new Date();
this.lastUsedAt = new Date();
this.maxConnections = config.max_connections || 10;
this.pool = mysql.createPool({
host: config.host,
port: config.port,
database: config.database_name,
user: config.username,
password: config.password,
connectionLimit: this.maxConnections,
waitForConnections: true,
queueLimit: 0,
connectTimeout: (config.connection_timeout || 30) * 1000,
// 연결 유지 및 자동 재연결 설정
enableKeepAlive: true,
keepAliveInitialDelay: 10000, // 10초마다 keep-alive 패킷 전송
ssl:
config.ssl_enabled === "Y" ? { rejectUnauthorized: false } : undefined,
});
// 연결 획득/해제 이벤트 추적
this.pool.on("acquire", () => {
this.activeConnections++;
logger.debug(
`[${this.dbType.toUpperCase()}] 연결 획득 (${this.activeConnections}/${this.maxConnections})`
);
});
this.pool.on("release", () => {
this.activeConnections--;
logger.debug(
`[${this.dbType.toUpperCase()}] 연결 반환 (${this.activeConnections}/${this.maxConnections})`
);
});
}
async query(sql: string, params?: any[]): Promise<any> {
this.lastUsedAt = new Date();
// 연결 풀이 닫힌 상태인지 확인
if (this.isPoolClosed) {
throw new Error("연결 풀이 닫힌 상태입니다. 재연결이 필요합니다.");
}
try {
const [rows] = await this.pool.execute(sql, params);
return rows;
} catch (error: any) {
// 연결 닫힘 오류 감지
if (
error.message.includes("closed state") ||
error.code === "PROTOCOL_CONNECTION_LOST" ||
error.code === "ECONNRESET"
) {
this.isPoolClosed = true;
logger.warn(
`[${this.dbType.toUpperCase()}] 연결 끊김 감지 (ID: ${this.connectionId})`
);
}
throw error;
}
}
async disconnect(): Promise<void> {
this.isPoolClosed = true;
await this.pool.end();
logger.info(
`[${this.dbType.toUpperCase()}] 연결 풀 종료 (ID: ${this.connectionId})`
);
}
isHealthy(): boolean {
// 연결 풀이 닫혔으면 비정상
if (this.isPoolClosed) {
return false;
}
return this.activeConnections < this.maxConnections;
}
}
/**
* 외부 DB 연결 풀 관리자
* 싱글톤 패턴으로 구현하여 전역적으로 연결 풀 관리
*/
export class ExternalDbConnectionPoolService {
private static instance: ExternalDbConnectionPoolService;
private pools: Map<number, ConnectionPoolWrapper> = new Map();
private readonly IDLE_TIMEOUT = 10 * 60 * 1000; // 10분 동안 사용되지 않으면 풀 제거
private readonly HEALTH_CHECK_INTERVAL = 60 * 1000; // 1분마다 헬스 체크
private healthCheckTimer?: NodeJS.Timeout;
private constructor() {
this.startHealthCheck();
logger.info("🔌 외부 DB 연결 풀 서비스 초기화 완료");
}
/**
* 싱글톤 인스턴스 반환
*/
static getInstance(): ExternalDbConnectionPoolService {
if (!ExternalDbConnectionPoolService.instance) {
ExternalDbConnectionPoolService.instance =
new ExternalDbConnectionPoolService();
}
return ExternalDbConnectionPoolService.instance;
}
/**
* 연결 풀 가져오기 (없으면 생성)
*/
async getPool(connectionId: number): Promise<ConnectionPoolWrapper> {
// 기존 풀이 있으면 반환
if (this.pools.has(connectionId)) {
const pool = this.pools.get(connectionId)!;
pool.lastUsedAt = new Date();
// 헬스 체크
if (!pool.isHealthy()) {
logger.warn(
`⚠️ 연결 풀 비정상 감지 (ID: ${connectionId}), 재생성 중...`
);
await this.removePool(connectionId);
return this.createPool(connectionId);
}
logger.debug(`✅ 기존 연결 풀 재사용 (ID: ${connectionId})`);
return pool;
}
// 새로운 풀 생성
return this.createPool(connectionId);
}
/**
* 새로운 연결 풀 생성
*/
private async createPool(
connectionId: number
): Promise<ConnectionPoolWrapper> {
logger.info(`🔧 새 연결 풀 생성 중 (ID: ${connectionId})...`);
// DB 연결 정보 조회 (실제 비밀번호 포함)
const connectionResult =
await ExternalDbConnectionService.getConnectionByIdWithPassword(
connectionId
);
if (!connectionResult.success || !connectionResult.data) {
throw new Error(`연결 정보를 찾을 수 없습니다 (ID: ${connectionId})`);
}
const config = connectionResult.data;
// 비활성화된 연결은 사용 불가
if (config.is_active !== "Y") {
throw new Error(`비활성화된 연결입니다 (ID: ${connectionId})`);
}
// 비밀번호 복호화
try {
config.password = PasswordEncryption.decrypt(config.password);
} catch (error) {
logger.error(`비밀번호 복호화 실패 (ID: ${connectionId}):`, error);
throw new Error("비밀번호 복호화에 실패했습니다");
}
// DB 타입에 따라 적절한 풀 생성
let pool: ConnectionPoolWrapper;
switch (config.db_type.toLowerCase()) {
case "postgresql":
pool = new PostgresPoolWrapper(config);
break;
case "mysql":
case "mariadb":
pool = new MySQLPoolWrapper(config);
break;
case "oracle":
case "mssql":
// TODO: Oracle과 MSSQL 지원 추가
throw new Error(`${config.db_type}는 아직 지원되지 않습니다`);
default:
throw new Error(`지원하지 않는 DB 타입: ${config.db_type}`);
}
this.pools.set(connectionId, pool);
logger.info(
`✅ 연결 풀 생성 완료 (ID: ${connectionId}, 타입: ${config.db_type}, 최대: ${pool.maxConnections})`
);
return pool;
}
/**
* 연결 풀 제거
*/
async removePool(connectionId: number): Promise<void> {
const pool = this.pools.get(connectionId);
if (pool) {
await pool.disconnect();
this.pools.delete(connectionId);
logger.info(`🗑️ 연결 풀 제거됨 (ID: ${connectionId})`);
}
}
/**
* 쿼리 실행 (자동으로 연결 풀 관리 + 재시도 로직)
*/
async executeQuery(
connectionId: number,
sql: string,
params?: any[],
retryCount = 0
): Promise<any> {
const MAX_RETRIES = 2;
try {
const pool = await this.getPool(connectionId);
logger.debug(
`📊 쿼리 실행 (ID: ${connectionId}): ${sql.substring(0, 100)}...`
);
const result = await pool.query(sql, params);
logger.debug(
`✅ 쿼리 완료 (ID: ${connectionId}), 결과: ${result.length}`
);
return result;
} catch (error: any) {
// 연결 끊김 오류인 경우 재시도
const isConnectionError =
error.message?.includes("closed state") ||
error.message?.includes("연결 풀이 닫힌 상태") ||
error.code === "PROTOCOL_CONNECTION_LOST" ||
error.code === "ECONNRESET" ||
error.code === "ETIMEDOUT";
if (isConnectionError && retryCount < MAX_RETRIES) {
logger.warn(
`🔄 연결 오류 감지, 재시도 중... (${retryCount + 1}/${MAX_RETRIES}) (ID: ${connectionId})`
);
// 기존 풀 제거 후 새로 생성
await this.removePool(connectionId);
// 잠시 대기 후 재시도
await new Promise((resolve) => setTimeout(resolve, 500));
return this.executeQuery(connectionId, sql, params, retryCount + 1);
}
logger.error(`❌ 쿼리 실행 실패 (ID: ${connectionId}):`, error);
throw error;
}
}
/**
* 연결 테스트 (풀을 생성하지 않고 단순 연결만 테스트)
*/
async testConnection(connectionId: number): Promise<boolean> {
try {
const pool = await this.getPool(connectionId);
// 간단한 쿼리로 연결 테스트
const testQuery =
pool.dbType === "postgresql" ? "SELECT 1 as test" : "SELECT 1 as test";
await pool.query(testQuery);
logger.info(`✅ 연결 테스트 성공 (ID: ${connectionId})`);
return true;
} catch (error) {
logger.error(`❌ 연결 테스트 실패 (ID: ${connectionId}):`, error);
return false;
}
}
/**
* 주기적인 헬스 체크 및 유휴 풀 정리
*/
private startHealthCheck(): void {
this.healthCheckTimer = setInterval(() => {
const now = Date.now();
this.pools.forEach(async (pool, connectionId) => {
const idleTime = now - pool.lastUsedAt.getTime();
// 유휴 시간 초과 시 풀 제거
if (idleTime > this.IDLE_TIMEOUT) {
logger.info(
`🧹 유휴 연결 풀 정리 (ID: ${connectionId}, 유휴: ${Math.round(idleTime / 1000)}초)`
);
await this.removePool(connectionId);
}
// 헬스 체크
if (!pool.isHealthy()) {
logger.warn(
`⚠️ 비정상 연결 풀 감지 (ID: ${connectionId}), 재생성 예약`
);
await this.removePool(connectionId);
}
});
// 상태 로깅
if (this.pools.size > 0) {
logger.debug(
`📊 연결 풀 상태: 총 ${this.pools.size}개, 활성: ${Array.from(this.pools.values()).filter((p) => p.activeConnections > 0).length}`
);
}
}, this.HEALTH_CHECK_INTERVAL);
logger.info("🔍 헬스 체크 타이머 시작 (간격: 1분)");
}
/**
* 모든 연결 풀 종료 (애플리케이션 종료 시 호출)
*/
async closeAll(): Promise<void> {
logger.info(`🛑 모든 연결 풀 종료 중... (총 ${this.pools.size}개)`);
if (this.healthCheckTimer) {
clearInterval(this.healthCheckTimer);
}
const closePromises = Array.from(this.pools.keys()).map((connectionId) =>
this.removePool(connectionId)
);
await Promise.all(closePromises);
logger.info("✅ 모든 연결 풀 종료 완료");
}
/**
* 현재 연결 풀 상태 조회
*/
getPoolsStatus(): Array<{
connectionId: number;
dbType: string;
activeConnections: number;
maxConnections: number;
createdAt: Date;
lastUsedAt: Date;
idleSeconds: number;
}> {
const now = Date.now();
return Array.from(this.pools.entries()).map(([connectionId, pool]) => ({
connectionId,
dbType: pool.dbType,
activeConnections: pool.activeConnections,
maxConnections: pool.maxConnections,
createdAt: pool.createdAt,
lastUsedAt: pool.lastUsedAt,
idleSeconds: Math.round((now - pool.lastUsedAt.getTime()) / 1000),
}));
}
}
// 애플리케이션 종료 시 연결 풀 정리
process.on("SIGINT", async () => {
logger.info("🛑 SIGINT 신호 수신, 연결 풀 정리 중...");
await ExternalDbConnectionPoolService.getInstance().closeAll();
process.exit(0);
});
process.on("SIGTERM", async () => {
logger.info("🛑 SIGTERM 신호 수신, 연결 풀 정리 중...");
await ExternalDbConnectionPoolService.getInstance().closeAll();
process.exit(0);
});