/** * 외부 DB 연결 헬퍼 * 플로우 데이터 이동 시 외부 DB 연결 관리 * PostgreSQL, MySQL/MariaDB, MSSQL, Oracle 지원 */ import { Pool as PgPool } from "pg"; import * as mysql from "mysql2/promise"; import db from "../database/db"; import { PasswordEncryption } from "../utils/passwordEncryption"; import { getConnectionTestQuery, getPlaceholder, getBeginTransactionQuery, getCommitQuery, getRollbackQuery, } from "./dbQueryBuilder"; interface ExternalDbConnection { id: number; connectionName: string; dbType: string; host: string; port: number; database: string; username: string; password: string; isActive: boolean; } // 외부 DB 연결 풀 캐시 (타입별로 다른 풀 객체) const connectionPools = new Map(); /** * 외부 DB 연결 정보 조회 */ async function getExternalConnection( connectionId: number ): Promise { const query = `SELECT * FROM external_db_connections WHERE id = $1 AND is_active = 'Y'`; const result = await db.query(query, [connectionId]); if (result.length === 0) { return null; } const row = result[0]; // 비밀번호 복호화 (암호화된 비밀번호는 password 컬럼에 저장됨) let decryptedPassword = ""; try { decryptedPassword = PasswordEncryption.decrypt(row.password); } catch (error) { console.error(`비밀번호 복호화 실패 (ID: ${connectionId}):`, error); // 복호화 실패 시 원본 비밀번호 사용 (fallback) decryptedPassword = row.password; } return { id: row.id, connectionName: row.connection_name, dbType: row.db_type, host: row.host, port: row.port, database: row.database_name, username: row.username, password: decryptedPassword, isActive: row.is_active, }; } /** * 외부 DB 연결 풀 생성 또는 재사용 */ export async function getExternalPool(connectionId: number): Promise { // 캐시된 연결 풀 확인 if (connectionPools.has(connectionId)) { const poolInfo = connectionPools.get(connectionId)!; const connection = await getExternalConnection(connectionId); // 연결이 유효한지 확인 try { const testQuery = getConnectionTestQuery(connection!.dbType); await executePoolQuery(poolInfo.pool, connection!.dbType, testQuery, []); return poolInfo; } catch (error) { console.warn( `캐시된 외부 DB 연결 풀 무효화 (ID: ${connectionId}), 재생성합니다.` ); connectionPools.delete(connectionId); await closePool(poolInfo.pool, connection!.dbType); } } // 새로운 연결 풀 생성 const connection = await getExternalConnection(connectionId); if (!connection) { throw new Error( `외부 DB 연결 정보를 찾을 수 없습니다 (ID: ${connectionId})` ); } const dbType = connection.dbType.toLowerCase(); let pool: any; try { switch (dbType) { case "postgresql": pool = await createPostgreSQLPool(connection); break; case "mysql": case "mariadb": pool = await createMySQLPool(connection); break; case "mssql": pool = await createMSSQLPool(connection); break; case "oracle": pool = await createOraclePool(connection); break; default: throw new Error(`지원하지 않는 DB 타입입니다: ${connection.dbType}`); } // 연결 테스트 const testQuery = getConnectionTestQuery(dbType); await executePoolQuery(pool, dbType, testQuery, []); console.log( `✅ 외부 DB 연결 풀 생성 성공 (ID: ${connectionId}, ${connection.connectionName}, ${connection.dbType})` ); // 캐시에 저장 (dbType 정보 포함) const poolInfo = { pool, dbType }; connectionPools.set(connectionId, poolInfo); return poolInfo; } catch (error) { if (pool) { await closePool(pool, dbType); } throw new Error( `외부 DB 연결 실패 (${connection.connectionName}, ${connection.dbType}): ${error}` ); } } /** * PostgreSQL 연결 풀 생성 */ async function createPostgreSQLPool( connection: ExternalDbConnection ): Promise { return new PgPool({ host: connection.host, port: connection.port, database: connection.database, user: connection.username, password: connection.password, max: 5, idleTimeoutMillis: 30000, connectionTimeoutMillis: 5000, }); } /** * MySQL/MariaDB 연결 풀 생성 */ async function createMySQLPool( connection: ExternalDbConnection ): Promise { return mysql.createPool({ host: connection.host, port: connection.port, database: connection.database, user: connection.username, password: connection.password, connectionLimit: 5, waitForConnections: true, queueLimit: 0, }); } /** * MSSQL 연결 풀 생성 */ async function createMSSQLPool(connection: ExternalDbConnection): Promise { // mssql 패키지를 동적으로 import (설치되어 있는 경우만) try { const sql = require("mssql"); const config = { user: connection.username, password: connection.password, server: connection.host, port: connection.port, database: connection.database, options: { encrypt: true, trustServerCertificate: true, enableArithAbort: true, }, pool: { max: 5, min: 0, idleTimeoutMillis: 30000, }, }; const pool = await sql.connect(config); return pool; } catch (error) { throw new Error( `MSSQL 연결 실패: mssql 패키지가 설치되어 있는지 확인하세요. (${error})` ); } } /** * Oracle 연결 풀 생성 */ async function createOraclePool( connection: ExternalDbConnection ): Promise { try { // oracledb를 동적으로 import const oracledb = require("oracledb"); // Oracle 클라이언트 초기화 (최초 1회만) if (!oracledb.oracleClientVersion) { // Instant Client 경로 설정 (환경변수로 지정 가능) const instantClientPath = process.env.ORACLE_INSTANT_CLIENT_PATH; if (instantClientPath) { oracledb.initOracleClient({ libDir: instantClientPath }); } } // 연결 문자열 생성 const connectString = connection.database.includes("/") ? connection.database // 이미 전체 연결 문자열인 경우 : `${connection.host}:${connection.port}/${connection.database}`; const pool = await oracledb.createPool({ user: connection.username, password: connection.password, connectString: connectString, poolMin: 1, poolMax: 5, poolIncrement: 1, poolTimeout: 60, // 60초 후 유휴 연결 해제 queueTimeout: 5000, // 연결 대기 타임아웃 5초 enableStatistics: true, }); return pool; } catch (error: any) { throw new Error( `Oracle 연결 실패: ${error.message}. oracledb 패키지와 Oracle Instant Client가 설치되어 있는지 확인하세요.` ); } } /** * 풀에서 쿼리 실행 (DB 타입별 처리) */ async function executePoolQuery( pool: any, dbType: string, query: string, params: any[] ): Promise { const normalizedType = dbType.toLowerCase(); switch (normalizedType) { case "postgresql": { const result = await pool.query(query, params); return { rows: result.rows, rowCount: result.rowCount }; } case "mysql": case "mariadb": { const [rows] = await pool.query(query, params); return { rows: Array.isArray(rows) ? rows : [rows], rowCount: rows.length, }; } case "mssql": { const request = pool.request(); // MSSQL은 명명된 파라미터 사용 params.forEach((param, index) => { request.input(`p${index + 1}`, param); }); const result = await request.query(query); return { rows: result.recordset, rowCount: result.rowCount }; } case "oracle": { const oracledb = require("oracledb"); const connection = await pool.getConnection(); try { // Oracle은 :1, :2 형식의 바인드 변수 사용 const result = await connection.execute(query, params, { autoCommit: false, // 트랜잭션 관리를 위해 false outFormat: oracledb.OUT_FORMAT_OBJECT, // 객체 형식으로 반환 }); return { rows: result.rows || [], rowCount: result.rowCount || 0 }; } finally { await connection.close(); } } default: throw new Error(`지원하지 않는 DB 타입: ${dbType}`); } } /** * 연결 풀 종료 (DB 타입별 처리) */ async function closePool(pool: any, dbType: string): Promise { const normalizedType = dbType.toLowerCase(); try { switch (normalizedType) { case "postgresql": case "mysql": case "mariadb": await pool.end(); break; case "mssql": case "oracle": await pool.close(); break; } } catch (error) { console.error(`풀 종료 오류 (${dbType}):`, error); } } /** * 외부 DB 쿼리 실행 */ export async function executeExternalQuery( connectionId: number, query: string, params: any[] = [] ): Promise { const poolInfo = await getExternalPool(connectionId); return await executePoolQuery(poolInfo.pool, poolInfo.dbType, query, params); } /** * 외부 DB 트랜잭션 실행 */ export async function executeExternalTransaction( connectionId: number, callback: (client: any, dbType: string) => Promise ): Promise { const poolInfo = await getExternalPool(connectionId); const { pool, dbType } = poolInfo; const normalizedType = dbType.toLowerCase(); let client: any; try { switch (normalizedType) { case "postgresql": { client = await pool.connect(); await client.query(getBeginTransactionQuery(dbType)); const result = await callback(client, dbType); await client.query(getCommitQuery(dbType)); return result; } case "mysql": case "mariadb": { client = await pool.getConnection(); await client.beginTransaction(); const result = await callback(client, dbType); await client.commit(); return result; } case "mssql": { const transaction = new pool.constructor.Transaction(pool); await transaction.begin(); client = transaction; const result = await callback(client, dbType); await transaction.commit(); return result; } case "oracle": { client = await pool.getConnection(); // Oracle은 명시적 BEGIN 없이 트랜잭션 시작 const result = await callback(client, dbType); // 명시적 커밋 await client.commit(); return result; } default: throw new Error(`지원하지 않는 DB 타입: ${dbType}`); } } catch (error) { console.error(`외부 DB 트랜잭션 오류 (ID: ${connectionId}):`, error); // 롤백 시도 if (client) { try { switch (normalizedType) { case "postgresql": await client.query(getRollbackQuery(dbType)); break; case "mysql": case "mariadb": await client.rollback(); break; case "mssql": case "oracle": await client.rollback(); break; } } catch (rollbackError) { console.error("트랜잭션 롤백 오류:", rollbackError); } } throw error; } finally { // 연결 해제 if (client) { try { switch (normalizedType) { case "postgresql": client.release(); break; case "mysql": case "mariadb": client.release(); break; case "oracle": await client.close(); break; case "mssql": // MSSQL Transaction 객체는 자동으로 정리됨 break; } } catch (releaseError) { console.error("클라이언트 해제 오류:", releaseError); } } } }