From 0dc4d538760c96f269ddca23b844a7538e458a4a Mon Sep 17 00:00:00 2001 From: kjs Date: Mon, 13 Oct 2025 17:47:24 +0900 Subject: [PATCH] =?UTF-8?q?=EC=A0=9C=EC=96=B4=EA=B4=80=EB=A6=AC=20?= =?UTF-8?q?=EB=85=B8=EB=93=9C=20=EC=9E=91=EB=8F=99=20=EB=B0=A9=EC=8B=9D=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/routes/externalDbConnectionRoutes.ts | 36 ++ .../externalDbConnectionPoolService.ts | 436 ++++++++++++++ .../src/services/nodeFlowExecutionService.ts | 533 +++++++++++++++--- docs/외부_DB_연결_풀_가이드.md | 491 ++++++++++++++++ .../admin/dataflow/node-editor/page.tsx | 4 +- .../dataflow/node-editor/FlowEditor.tsx | 2 - .../node-editor/nodes/FieldMappingNode.tsx | 66 --- .../node-editor/panels/PropertiesPanel.tsx | 4 - .../panels/properties/ConditionProperties.tsx | 22 + .../properties/FieldMappingProperties.tsx | 191 ------- .../properties/InsertActionProperties.tsx | 86 ++- .../properties/UpdateActionProperties.tsx | 47 +- .../properties/UpsertActionProperties.tsx | 47 +- .../node-editor/sidebar/nodePaletteConfig.ts | 8 - frontend/types/node-editor.ts | 1 - 15 files changed, 1567 insertions(+), 407 deletions(-) create mode 100644 backend-node/src/services/externalDbConnectionPoolService.ts create mode 100644 docs/외부_DB_연결_풀_가이드.md delete mode 100644 frontend/components/dataflow/node-editor/nodes/FieldMappingNode.tsx delete mode 100644 frontend/components/dataflow/node-editor/panels/properties/FieldMappingProperties.tsx diff --git a/backend-node/src/routes/externalDbConnectionRoutes.ts b/backend-node/src/routes/externalDbConnectionRoutes.ts index baeb5f6d..ca7d1600 100644 --- a/backend-node/src/routes/externalDbConnectionRoutes.ts +++ b/backend-node/src/routes/externalDbConnectionRoutes.ts @@ -85,6 +85,42 @@ router.get( } ); +/** + * GET /api/external-db-connections/pool-status + * 연결 풀 상태 조회 + */ +router.get( + "/pool-status", + authenticateToken, + async (req: AuthenticatedRequest, res: Response) => { + try { + const { ExternalDbConnectionPoolService } = await import( + "../services/externalDbConnectionPoolService" + ); + const poolService = ExternalDbConnectionPoolService.getInstance(); + const poolsStatus = poolService.getPoolsStatus(); + + return res.status(200).json({ + success: true, + data: { + totalPools: poolsStatus.length, + activePools: poolsStatus.filter((p) => p.activeConnections > 0) + .length, + pools: poolsStatus, + }, + message: `${poolsStatus.length}개의 연결 풀 상태를 조회했습니다.`, + }); + } catch (error) { + console.error("연결 풀 상태 조회 오류:", error); + return res.status(500).json({ + success: false, + message: "서버 내부 오류가 발생했습니다.", + error: error instanceof Error ? error.message : "알 수 없는 오류", + }); + } + } +); + /** * GET /api/external-db-connections/grouped * DB 타입별로 그룹화된 외부 DB 연결 목록 조회 diff --git a/backend-node/src/services/externalDbConnectionPoolService.ts b/backend-node/src/services/externalDbConnectionPoolService.ts new file mode 100644 index 00000000..940787c3 --- /dev/null +++ b/backend-node/src/services/externalDbConnectionPoolService.ts @@ -0,0 +1,436 @@ +// 외부 DB 연결 풀 관리 서비스 +// 작성일: 2025-01-13 +// 연결 풀 고갈 방지를 위한 중앙 관리 시스템 + +import { Pool } from "pg"; +import mysql from "mysql2/promise"; +import { ExternalDbConnection } from "../types/externalDbTypes"; +import { ExternalDbConnectionService } from "./externalDbConnectionService"; +import { PasswordEncryption } from "../utils/passwordEncryption"; +import logger from "../utils/logger"; + +/** + * 연결 풀 래퍼 인터페이스 + * 모든 DB 타입의 연결 풀을 통일된 방식으로 관리 + */ +interface ConnectionPoolWrapper { + pool: any; // 실제 연결 풀 객체 + dbType: string; + connectionId: number; + createdAt: Date; + lastUsedAt: Date; + activeConnections: number; + maxConnections: number; + + // 통일된 쿼리 실행 인터페이스 + query(sql: string, params?: any[]): Promise; + + // 연결 풀 종료 + disconnect(): Promise; + + // 연결 풀 상태 확인 + isHealthy(): boolean; +} + +/** + * PostgreSQL 연결 풀 래퍼 + */ +class PostgresPoolWrapper implements ConnectionPoolWrapper { + pool: Pool; + dbType = "postgresql"; + connectionId: number; + createdAt: Date; + lastUsedAt: Date; + activeConnections = 0; + maxConnections: number; + + constructor(config: ExternalDbConnection) { + this.connectionId = config.id!; + this.createdAt = new Date(); + this.lastUsedAt = new Date(); + this.maxConnections = config.max_connections || 10; + + this.pool = new Pool({ + host: config.host, + port: config.port, + database: config.database_name, + user: config.username, + password: config.password, + max: this.maxConnections, + min: 2, // 최소 연결 수 + idleTimeoutMillis: 30000, // 30초 동안 사용되지 않으면 연결 해제 + connectionTimeoutMillis: (config.connection_timeout || 30) * 1000, + statement_timeout: (config.query_timeout || 60) * 1000, + ssl: config.ssl_enabled === "Y" ? { rejectUnauthorized: false } : false, + }); + + // 연결 풀 이벤트 리스너 + this.pool.on("connect", () => { + this.activeConnections++; + logger.debug( + `[PostgreSQL] 새 연결 생성 (${this.activeConnections}/${this.maxConnections})` + ); + }); + + this.pool.on("remove", () => { + this.activeConnections--; + logger.debug( + `[PostgreSQL] 연결 제거 (${this.activeConnections}/${this.maxConnections})` + ); + }); + + this.pool.on("error", (err) => { + logger.error(`[PostgreSQL] 연결 풀 오류:`, err); + }); + } + + async query(sql: string, params?: any[]): Promise { + this.lastUsedAt = new Date(); + const result = await this.pool.query(sql, params); + return result.rows; + } + + async disconnect(): Promise { + await this.pool.end(); + logger.info(`[PostgreSQL] 연결 풀 종료 (ID: ${this.connectionId})`); + } + + isHealthy(): boolean { + return ( + this.pool.totalCount > 0 && this.activeConnections < this.maxConnections + ); + } +} + +/** + * MySQL/MariaDB 연결 풀 래퍼 + */ +class MySQLPoolWrapper implements ConnectionPoolWrapper { + pool: mysql.Pool; + dbType: string; + connectionId: number; + createdAt: Date; + lastUsedAt: Date; + activeConnections = 0; + maxConnections: number; + + constructor(config: ExternalDbConnection) { + this.connectionId = config.id!; + this.dbType = config.db_type; + this.createdAt = new Date(); + this.lastUsedAt = new Date(); + this.maxConnections = config.max_connections || 10; + + this.pool = mysql.createPool({ + host: config.host, + port: config.port, + database: config.database_name, + user: config.username, + password: config.password, + connectionLimit: this.maxConnections, + waitForConnections: true, + queueLimit: 0, + connectTimeout: (config.connection_timeout || 30) * 1000, + ssl: + config.ssl_enabled === "Y" ? { rejectUnauthorized: false } : undefined, + }); + + // 연결 획득/해제 이벤트 추적 + this.pool.on("acquire", () => { + this.activeConnections++; + logger.debug( + `[${this.dbType.toUpperCase()}] 연결 획득 (${this.activeConnections}/${this.maxConnections})` + ); + }); + + this.pool.on("release", () => { + this.activeConnections--; + logger.debug( + `[${this.dbType.toUpperCase()}] 연결 반환 (${this.activeConnections}/${this.maxConnections})` + ); + }); + } + + async query(sql: string, params?: any[]): Promise { + this.lastUsedAt = new Date(); + const [rows] = await this.pool.execute(sql, params); + return rows; + } + + async disconnect(): Promise { + await this.pool.end(); + logger.info( + `[${this.dbType.toUpperCase()}] 연결 풀 종료 (ID: ${this.connectionId})` + ); + } + + isHealthy(): boolean { + return this.activeConnections < this.maxConnections; + } +} + +/** + * 외부 DB 연결 풀 관리자 + * 싱글톤 패턴으로 구현하여 전역적으로 연결 풀 관리 + */ +export class ExternalDbConnectionPoolService { + private static instance: ExternalDbConnectionPoolService; + private pools: Map = new Map(); + private readonly IDLE_TIMEOUT = 10 * 60 * 1000; // 10분 동안 사용되지 않으면 풀 제거 + private readonly HEALTH_CHECK_INTERVAL = 60 * 1000; // 1분마다 헬스 체크 + private healthCheckTimer?: NodeJS.Timeout; + + private constructor() { + this.startHealthCheck(); + logger.info("🔌 외부 DB 연결 풀 서비스 초기화 완료"); + } + + /** + * 싱글톤 인스턴스 반환 + */ + static getInstance(): ExternalDbConnectionPoolService { + if (!ExternalDbConnectionPoolService.instance) { + ExternalDbConnectionPoolService.instance = + new ExternalDbConnectionPoolService(); + } + return ExternalDbConnectionPoolService.instance; + } + + /** + * 연결 풀 가져오기 (없으면 생성) + */ + async getPool(connectionId: number): Promise { + // 기존 풀이 있으면 반환 + if (this.pools.has(connectionId)) { + const pool = this.pools.get(connectionId)!; + pool.lastUsedAt = new Date(); + + // 헬스 체크 + if (!pool.isHealthy()) { + logger.warn( + `⚠️ 연결 풀 비정상 감지 (ID: ${connectionId}), 재생성 중...` + ); + await this.removePool(connectionId); + return this.createPool(connectionId); + } + + logger.debug(`✅ 기존 연결 풀 재사용 (ID: ${connectionId})`); + return pool; + } + + // 새로운 풀 생성 + return this.createPool(connectionId); + } + + /** + * 새로운 연결 풀 생성 + */ + private async createPool( + connectionId: number + ): Promise { + logger.info(`🔧 새 연결 풀 생성 중 (ID: ${connectionId})...`); + + // DB 연결 정보 조회 + const connectionResult = + await ExternalDbConnectionService.getConnectionById(connectionId); + + if (!connectionResult.success || !connectionResult.data) { + throw new Error(`연결 정보를 찾을 수 없습니다 (ID: ${connectionId})`); + } + + const config = connectionResult.data; + + // 비활성화된 연결은 사용 불가 + if (config.is_active !== "Y") { + throw new Error(`비활성화된 연결입니다 (ID: ${connectionId})`); + } + + // 비밀번호 복호화 + try { + config.password = PasswordEncryption.decrypt(config.password); + } catch (error) { + logger.error(`비밀번호 복호화 실패 (ID: ${connectionId}):`, error); + throw new Error("비밀번호 복호화에 실패했습니다"); + } + + // DB 타입에 따라 적절한 풀 생성 + let pool: ConnectionPoolWrapper; + + switch (config.db_type.toLowerCase()) { + case "postgresql": + pool = new PostgresPoolWrapper(config); + break; + + case "mysql": + case "mariadb": + pool = new MySQLPoolWrapper(config); + break; + + case "oracle": + case "mssql": + // TODO: Oracle과 MSSQL 지원 추가 + throw new Error(`${config.db_type}는 아직 지원되지 않습니다`); + + default: + throw new Error(`지원하지 않는 DB 타입: ${config.db_type}`); + } + + this.pools.set(connectionId, pool); + logger.info( + `✅ 연결 풀 생성 완료 (ID: ${connectionId}, 타입: ${config.db_type}, 최대: ${pool.maxConnections})` + ); + + return pool; + } + + /** + * 연결 풀 제거 + */ + async removePool(connectionId: number): Promise { + const pool = this.pools.get(connectionId); + if (pool) { + await pool.disconnect(); + this.pools.delete(connectionId); + logger.info(`🗑️ 연결 풀 제거됨 (ID: ${connectionId})`); + } + } + + /** + * 쿼리 실행 (자동으로 연결 풀 관리) + */ + async executeQuery( + connectionId: number, + sql: string, + params?: any[] + ): Promise { + const pool = await this.getPool(connectionId); + + try { + logger.debug( + `📊 쿼리 실행 (ID: ${connectionId}): ${sql.substring(0, 100)}...` + ); + const result = await pool.query(sql, params); + logger.debug( + `✅ 쿼리 완료 (ID: ${connectionId}), 결과: ${result.length}건` + ); + return result; + } catch (error) { + logger.error(`❌ 쿼리 실행 실패 (ID: ${connectionId}):`, error); + throw error; + } + } + + /** + * 연결 테스트 (풀을 생성하지 않고 단순 연결만 테스트) + */ + async testConnection(connectionId: number): Promise { + try { + const pool = await this.getPool(connectionId); + + // 간단한 쿼리로 연결 테스트 + const testQuery = + pool.dbType === "postgresql" ? "SELECT 1 as test" : "SELECT 1 as test"; + + await pool.query(testQuery); + logger.info(`✅ 연결 테스트 성공 (ID: ${connectionId})`); + return true; + } catch (error) { + logger.error(`❌ 연결 테스트 실패 (ID: ${connectionId}):`, error); + return false; + } + } + + /** + * 주기적인 헬스 체크 및 유휴 풀 정리 + */ + private startHealthCheck(): void { + this.healthCheckTimer = setInterval(() => { + const now = Date.now(); + + this.pools.forEach(async (pool, connectionId) => { + const idleTime = now - pool.lastUsedAt.getTime(); + + // 유휴 시간 초과 시 풀 제거 + if (idleTime > this.IDLE_TIMEOUT) { + logger.info( + `🧹 유휴 연결 풀 정리 (ID: ${connectionId}, 유휴: ${Math.round(idleTime / 1000)}초)` + ); + await this.removePool(connectionId); + } + + // 헬스 체크 + if (!pool.isHealthy()) { + logger.warn( + `⚠️ 비정상 연결 풀 감지 (ID: ${connectionId}), 재생성 예약` + ); + await this.removePool(connectionId); + } + }); + + // 상태 로깅 + if (this.pools.size > 0) { + logger.debug( + `📊 연결 풀 상태: 총 ${this.pools.size}개, 활성: ${Array.from(this.pools.values()).filter((p) => p.activeConnections > 0).length}개` + ); + } + }, this.HEALTH_CHECK_INTERVAL); + + logger.info("🔍 헬스 체크 타이머 시작 (간격: 1분)"); + } + + /** + * 모든 연결 풀 종료 (애플리케이션 종료 시 호출) + */ + async closeAll(): Promise { + logger.info(`🛑 모든 연결 풀 종료 중... (총 ${this.pools.size}개)`); + + if (this.healthCheckTimer) { + clearInterval(this.healthCheckTimer); + } + + const closePromises = Array.from(this.pools.keys()).map((connectionId) => + this.removePool(connectionId) + ); + + await Promise.all(closePromises); + logger.info("✅ 모든 연결 풀 종료 완료"); + } + + /** + * 현재 연결 풀 상태 조회 + */ + getPoolsStatus(): Array<{ + connectionId: number; + dbType: string; + activeConnections: number; + maxConnections: number; + createdAt: Date; + lastUsedAt: Date; + idleSeconds: number; + }> { + const now = Date.now(); + + return Array.from(this.pools.entries()).map(([connectionId, pool]) => ({ + connectionId, + dbType: pool.dbType, + activeConnections: pool.activeConnections, + maxConnections: pool.maxConnections, + createdAt: pool.createdAt, + lastUsedAt: pool.lastUsedAt, + idleSeconds: Math.round((now - pool.lastUsedAt.getTime()) / 1000), + })); + } +} + +// 애플리케이션 종료 시 연결 풀 정리 +process.on("SIGINT", async () => { + logger.info("🛑 SIGINT 신호 수신, 연결 풀 정리 중..."); + await ExternalDbConnectionPoolService.getInstance().closeAll(); + process.exit(0); +}); + +process.on("SIGTERM", async () => { + logger.info("🛑 SIGTERM 신호 수신, 연결 풀 정리 중..."); + await ExternalDbConnectionPoolService.getInstance().closeAll(); + process.exit(0); +}); diff --git a/backend-node/src/services/nodeFlowExecutionService.ts b/backend-node/src/services/nodeFlowExecutionService.ts index 124d2b41..7a887bdb 100644 --- a/backend-node/src/services/nodeFlowExecutionService.ts +++ b/backend-node/src/services/nodeFlowExecutionService.ts @@ -26,7 +26,6 @@ export type NodeType = | "externalDBSource" | "restAPISource" | "condition" - | "fieldMapping" | "dataTransform" | "insertAction" | "updateAction" @@ -429,14 +428,79 @@ export class NodeFlowExecutionService { return context.sourceData; } else if (parents.length === 1) { // 단일 부모: 부모의 결과 데이터 전달 - const parentResult = context.nodeResults.get(parents[0]); - return parentResult?.data || context.sourceData; + const parentId = parents[0]; + const parentResult = context.nodeResults.get(parentId); + let data = parentResult?.data || context.sourceData; + + // 🔥 조건 노드에서 온 데이터인 경우 sourceHandle 확인 + const edge = edges.find( + (e) => e.source === parentId && e.target === nodeId + ); + if ( + edge?.sourceHandle && + data && + typeof data === "object" && + "conditionResult" in data + ) { + // 조건 노드의 결과 객체 + if (edge.sourceHandle === "true") { + logger.info( + `✅ TRUE 브랜치 데이터 사용: ${data.trueData?.length || 0}건` + ); + return data.trueData || []; + } else if (edge.sourceHandle === "false") { + logger.info( + `✅ FALSE 브랜치 데이터 사용: ${data.falseData?.length || 0}건` + ); + return data.falseData || []; + } else { + // sourceHandle이 없거나 다른 값이면 allData 사용 + return data.allData || data; + } + } + + return data; } else { // 다중 부모: 모든 부모의 데이터 병합 - return parents.map((parentId) => { - const result = context.nodeResults.get(parentId); - return result?.data || context.sourceData; + const allData: any[] = []; + + parents.forEach((parentId) => { + const parentResult = context.nodeResults.get(parentId); + let data = parentResult?.data || context.sourceData; + + // 🔥 조건 노드에서 온 데이터인 경우 sourceHandle 확인 + const edge = edges.find( + (e) => e.source === parentId && e.target === nodeId + ); + if ( + edge?.sourceHandle && + data && + typeof data === "object" && + "conditionResult" in data + ) { + // 조건 노드의 결과 객체 + if (edge.sourceHandle === "true") { + data = data.trueData || []; + } else if (edge.sourceHandle === "false") { + data = data.falseData || []; + } else { + data = data.allData || data; + } + } + + // 배열이면 펼쳐서 추가 + if (Array.isArray(data)) { + allData.push(...data); + } else { + allData.push(data); + } }); + + logger.info( + `🔗 다중 부모 병합: ${parents.length}개 부모, 총 ${allData.length}건 데이터` + ); + + return allData; } } @@ -453,6 +517,9 @@ export class NodeFlowExecutionService { case "tableSource": return this.executeTableSource(node, context); + case "externalDBSource": + return this.executeExternalDBSource(node, context); + case "restAPISource": return this.executeRestAPISource(node, context); @@ -603,6 +670,60 @@ export class NodeFlowExecutionService { } } + /** + * 외부 DB 소스 노드 실행 + */ + private static async executeExternalDBSource( + node: FlowNode, + context: ExecutionContext + ): Promise { + const { connectionId, tableName, schema, whereConditions } = node.data; + + if (!connectionId || !tableName) { + throw new Error("외부 DB 연결 정보 또는 테이블명이 설정되지 않았습니다."); + } + + logger.info(`🔌 외부 DB 소스 조회: ${connectionId}.${tableName}`); + + try { + // 연결 풀 서비스 임포트 (동적 임포트로 순환 참조 방지) + const { ExternalDbConnectionPoolService } = await import( + "./externalDbConnectionPoolService" + ); + const poolService = ExternalDbConnectionPoolService.getInstance(); + + // 스키마 접두사 처리 + const schemaPrefix = schema ? `${schema}.` : ""; + const fullTableName = `${schemaPrefix}${tableName}`; + + // WHERE 절 생성 + let sql = `SELECT * FROM ${fullTableName}`; + let params: any[] = []; + + if (whereConditions && whereConditions.length > 0) { + const whereResult = this.buildWhereClause(whereConditions); + sql += ` ${whereResult.clause}`; + params = whereResult.values; + } + + logger.info(`📊 외부 DB 쿼리 실행: ${sql}`); + + // 연결 풀을 통해 쿼리 실행 + const result = await poolService.executeQuery(connectionId, sql, params); + + logger.info( + `✅ 외부 DB 소스 조회 완료: ${tableName}, ${result.length}건` + ); + + return result; + } catch (error: any) { + logger.error(`❌ 외부 DB 소스 조회 실패:`, error); + throw new Error( + `외부 DB 조회 실패 (연결 ID: ${connectionId}): ${error.message}` + ); + } + } + /** * 테이블 소스 노드 실행 */ @@ -633,13 +754,13 @@ export class NodeFlowExecutionService { } const schemaPrefix = schema ? `${schema}.` : ""; - const whereClause = whereConditions - ? `WHERE ${this.buildWhereClause(whereConditions)}` - : ""; + const whereResult = whereConditions + ? this.buildWhereClause(whereConditions) + : { clause: "", values: [] }; - const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereClause}`; + const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereResult.clause}`; - const result = await query(sql, []); + const result = await query(sql, whereResult.values); logger.info(`📊 테이블 소스 조회: ${tableName}, ${result.length}건`); @@ -703,11 +824,15 @@ export class NodeFlowExecutionService { const executeInsert = async (txClient: any) => { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let insertedCount = 0; + const insertedDataArray: any[] = []; for (const data of dataArray) { const fields: string[] = []; const values: any[] = []; + // 🔥 삽입된 데이터 복사본 생성 + const insertedData = { ...data }; + console.log("🗺️ 필드 매핑 처리 중..."); fieldMappings.forEach((mapping: any) => { fields.push(mapping.targetField); @@ -720,25 +845,38 @@ export class NodeFlowExecutionService { ` ${mapping.sourceField} → ${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}` ); values.push(value); + + // 🔥 삽입된 값을 데이터에 반영 + insertedData[mapping.targetField] = value; }); const sql = ` INSERT INTO ${targetTable} (${fields.join(", ")}) VALUES (${fields.map((_, i) => `$${i + 1}`).join(", ")}) + RETURNING * `; console.log("📝 실행할 SQL:", sql); console.log("📊 바인딩 값:", values); - await txClient.query(sql, values); + const result = await txClient.query(sql, values); insertedCount++; + + // 🔥 RETURNING으로 받은 실제 삽입 데이터 사용 (AUTO_INCREMENT 등 포함) + if (result.rows && result.rows.length > 0) { + insertedDataArray.push(result.rows[0]); + } else { + // RETURNING이 없으면 생성한 데이터 사용 + insertedDataArray.push(insertedData); + } } logger.info( `✅ INSERT 완료 (내부 DB): ${targetTable}, ${insertedCount}건` ); - return { insertedCount }; + // 🔥 삽입된 데이터 반환 (AUTO_INCREMENT ID 등 포함) + return insertedDataArray; }; // 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성 @@ -781,6 +919,7 @@ export class NodeFlowExecutionService { try { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let insertedCount = 0; + const insertedDataArray: any[] = []; // 🔥 Oracle의 경우 autoCommit을 false로 설정하여 트랜잭션 제어 const isOracle = externalDbType.toLowerCase() === "oracle"; @@ -788,6 +927,7 @@ export class NodeFlowExecutionService { for (const data of dataArray) { const fields: string[] = []; const values: any[] = []; + const insertedData: any = { ...data }; fieldMappings.forEach((mapping: any) => { fields.push(mapping.targetField); @@ -796,6 +936,8 @@ export class NodeFlowExecutionService { ? mapping.staticValue : data[mapping.sourceField]; values.push(value); + // 🔥 삽입된 데이터 객체에 매핑된 값 적용 + insertedData[mapping.targetField] = value; }); // 외부 DB별 SQL 문법 차이 처리 @@ -828,6 +970,7 @@ export class NodeFlowExecutionService { await connector.executeQuery(sql, params); insertedCount++; + insertedDataArray.push(insertedData); } // 🔥 Oracle의 경우 명시적 COMMIT @@ -841,7 +984,8 @@ export class NodeFlowExecutionService { `✅ INSERT 완료 (외부 DB): ${externalTargetTable}, ${insertedCount}건` ); - return { insertedCount }; + // 🔥 삽입된 데이터 반환 (외부 DB는 자동 생성 ID 없으므로 입력 데이터 기반) + return insertedDataArray; } catch (error) { // 🔥 Oracle의 경우 오류 시 ROLLBACK await this.rollbackExternalTransaction(connector, externalDbType); @@ -985,38 +1129,28 @@ export class NodeFlowExecutionService { connectionId: number, dbType: string ): Promise { - // 외부 DB 커넥션 정보 조회 - const connectionData: any = await queryOne( - "SELECT * FROM external_db_connections WHERE id = $1", - [connectionId] + // 🔥 연결 풀 서비스를 통한 연결 관리 (연결 풀 고갈 방지) + const { ExternalDbConnectionPoolService } = await import( + "./externalDbConnectionPoolService" ); + const poolService = ExternalDbConnectionPoolService.getInstance(); + const pool = await poolService.getPool(connectionId); - if (!connectionData) { - throw new Error(`외부 DB 커넥션을 찾을 수 없습니다: ${connectionId}`); - } - - // 패스워드 복호화 - const { EncryptUtil } = await import("../utils/encryptUtil"); - const decryptedPassword = EncryptUtil.decrypt(connectionData.password); - - const config = { - host: connectionData.host, - port: connectionData.port, - database: connectionData.database_name, - user: connectionData.username, - password: decryptedPassword, + // DatabaseConnectorFactory와 호환되도록 래퍼 객체 반환 + return { + executeQuery: async (sql: string, params?: any[]) => { + const result = await pool.query(sql, params); + return { + rows: Array.isArray(result) ? result : [result], + rowCount: Array.isArray(result) ? result.length : 1, + affectedRows: Array.isArray(result) ? result.length : 1, + }; + }, + disconnect: async () => { + // 연결 풀은 자동 관리되므로 즉시 종료하지 않음 + logger.debug(`📌 연결 풀 유지 (ID: ${connectionId})`); + }, }; - - // DatabaseConnectorFactory를 사용하여 외부 DB 연결 - const { DatabaseConnectorFactory } = await import( - "../database/DatabaseConnectorFactory" - ); - - return await DatabaseConnectorFactory.createConnector( - dbType, - config, - connectionId - ); } /** @@ -1107,12 +1241,16 @@ export class NodeFlowExecutionService { const executeUpdate = async (txClient: any) => { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let updatedCount = 0; + const updatedDataArray: any[] = []; for (const data of dataArray) { const setClauses: string[] = []; const values: any[] = []; let paramIndex = 1; + // 🔥 업데이트된 데이터 복사본 생성 + const updatedData = { ...data }; + console.log("🗺️ 필드 매핑 처리 중..."); fieldMappings.forEach((mapping: any) => { const value = @@ -1123,21 +1261,35 @@ export class NodeFlowExecutionService { console.log( ` ${mapping.sourceField} → ${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}` ); - setClauses.push(`${mapping.targetField} = $${paramIndex}`); - values.push(value); - paramIndex++; + + // targetField가 비어있지 않은 경우만 추가 + if (mapping.targetField) { + setClauses.push(`${mapping.targetField} = $${paramIndex}`); + values.push(value); + paramIndex++; + + // 🔥 업데이트된 값을 데이터에 반영 + updatedData[mapping.targetField] = value; + } else { + console.log( + `⚠️ targetField가 비어있어 스킵: ${mapping.sourceField}` + ); + } }); - const whereClause = this.buildWhereClause( + const whereResult = this.buildWhereClause( whereConditions, data, paramIndex ); + // WHERE 절의 값들을 values 배열에 추가 + values.push(...whereResult.values); + const sql = ` UPDATE ${targetTable} SET ${setClauses.join(", ")} - ${whereClause} + ${whereResult.clause} `; console.log("📝 실행할 SQL:", sql); @@ -1145,13 +1297,17 @@ export class NodeFlowExecutionService { const result = await txClient.query(sql, values); updatedCount += result.rowCount || 0; + + // 🔥 업데이트된 데이터 저장 + updatedDataArray.push(updatedData); } logger.info( `✅ UPDATE 완료 (내부 DB): ${targetTable}, ${updatedCount}건` ); - return { updatedCount }; + // 🔥 업데이트된 데이터 반환 (다음 노드에서 사용) + return updatedDataArray; }; // 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성 @@ -1195,11 +1351,13 @@ export class NodeFlowExecutionService { try { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let updatedCount = 0; + const updatedDataArray: any[] = []; for (const data of dataArray) { const setClauses: string[] = []; const values: any[] = []; let paramIndex = 1; + const updatedData: any = { ...data }; fieldMappings.forEach((mapping: any) => { const value = @@ -1222,6 +1380,8 @@ export class NodeFlowExecutionService { values.push(value); paramIndex++; + // 🔥 업데이트된 데이터 객체에 매핑된 값 적용 + updatedData[mapping.targetField] = value; }); // WHERE 조건 생성 @@ -1263,6 +1423,7 @@ export class NodeFlowExecutionService { const result = await connector.executeQuery(sql, values); updatedCount += result.rowCount || result.affectedRows || 0; + updatedDataArray.push(updatedData); } // 🔥 Oracle의 경우 명시적 COMMIT @@ -1276,7 +1437,8 @@ export class NodeFlowExecutionService { `✅ UPDATE 완료 (외부 DB): ${externalTargetTable}, ${updatedCount}건` ); - return { updatedCount }; + // 🔥 업데이트된 데이터 반환 + return updatedDataArray; } catch (error) { // 🔥 Oracle의 경우 오류 시 ROLLBACK await this.rollbackExternalTransaction(connector, externalDbType); @@ -1439,24 +1601,32 @@ export class NodeFlowExecutionService { const executeDelete = async (txClient: any) => { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let deletedCount = 0; + const deletedDataArray: any[] = []; for (const data of dataArray) { console.log("🔍 WHERE 조건 처리 중..."); - const whereClause = this.buildWhereClause(whereConditions, data, 1); + const whereResult = this.buildWhereClause(whereConditions, data, 1); - const sql = `DELETE FROM ${targetTable} ${whereClause}`; + const sql = `DELETE FROM ${targetTable} ${whereResult.clause} RETURNING *`; console.log("📝 실행할 SQL:", sql); + console.log("📊 바인딩 값:", whereResult.values); - const result = await txClient.query(sql, []); + const result = await txClient.query(sql, whereResult.values); deletedCount += result.rowCount || 0; + + // 🔥 RETURNING으로 받은 삭제된 데이터 저장 + if (result.rows && result.rows.length > 0) { + deletedDataArray.push(...result.rows); + } } logger.info( `✅ DELETE 완료 (내부 DB): ${targetTable}, ${deletedCount}건` ); - return { deletedCount }; + // 🔥 삭제된 데이터 반환 (로그 기록 등에 사용) + return deletedDataArray; }; // 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성 @@ -1499,6 +1669,7 @@ export class NodeFlowExecutionService { try { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let deletedCount = 0; + const deletedDataArray: any[] = []; for (const data of dataArray) { const whereClauses: string[] = []; @@ -1545,9 +1716,16 @@ export class NodeFlowExecutionService { ); } - const sql = `DELETE FROM ${externalTargetTable} ${whereClause}`; + // 🔥 삭제 전에 데이터 조회 (로그 기록 용도) + const selectSql = `SELECT * FROM ${externalTargetTable} ${whereClause}`; + const selectResult = await connector.executeQuery(selectSql, values); + if (selectResult && selectResult.length > 0) { + deletedDataArray.push(...selectResult); + } - const result = await connector.executeQuery(sql, values); + // 실제 삭제 수행 + const deleteSql = `DELETE FROM ${externalTargetTable} ${whereClause}`; + const result = await connector.executeQuery(deleteSql, values); deletedCount += result.rowCount || result.affectedRows || 0; } @@ -1562,7 +1740,8 @@ export class NodeFlowExecutionService { `✅ DELETE 완료 (외부 DB): ${externalTargetTable}, ${deletedCount}건` ); - return { deletedCount }; + // 🔥 삭제된 데이터 반환 + return deletedDataArray; } catch (error) { // 🔥 Oracle의 경우 오류 시 ROLLBACK await this.rollbackExternalTransaction(connector, externalDbType); @@ -2135,16 +2314,93 @@ export class NodeFlowExecutionService { node: FlowNode, inputData: any, context: ExecutionContext - ): Promise { + ): Promise { const { conditions, logic } = node.data; + logger.info( + `🔍 조건 노드 실행 - inputData 타입: ${typeof inputData}, 배열 여부: ${Array.isArray(inputData)}, 길이: ${Array.isArray(inputData) ? inputData.length : "N/A"}` + ); + logger.info(`🔍 조건 개수: ${conditions?.length || 0}, 로직: ${logic}`); + + if (inputData) { + console.log( + "📥 조건 노드 입력 데이터:", + JSON.stringify(inputData, null, 2).substring(0, 500) + ); + } else { + console.log("⚠️ 조건 노드 입력 데이터가 없습니다!"); + } + + // 조건이 없으면 모든 데이터 통과 + if (!conditions || conditions.length === 0) { + logger.info("⚠️ 조건이 설정되지 않음 - 모든 데이터 통과"); + const dataArray = Array.isArray(inputData) ? inputData : [inputData]; + return { + conditionResult: true, + trueData: dataArray, + falseData: [], + allData: dataArray, + }; + } + + // inputData가 배열인 경우 각 항목을 필터링 + if (Array.isArray(inputData)) { + const trueData: any[] = []; + const falseData: any[] = []; + + inputData.forEach((item: any) => { + const results = conditions.map((condition: any) => { + const fieldValue = item[condition.field]; + + let compareValue = condition.value; + if (condition.valueType === "field") { + compareValue = item[condition.value]; + logger.info( + `🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})` + ); + } else { + logger.info( + `📊 고정값 비교: ${condition.field} (${fieldValue}) vs ${compareValue}` + ); + } + + return this.evaluateCondition( + fieldValue, + condition.operator, + compareValue + ); + }); + + const result = + logic === "OR" + ? results.some((r: boolean) => r) + : results.every((r: boolean) => r); + + if (result) { + trueData.push(item); + } else { + falseData.push(item); + } + }); + + logger.info( + `🔍 조건 필터링 결과: TRUE ${trueData.length}건 / FALSE ${falseData.length}건 (${logic} 로직)` + ); + + return { + conditionResult: trueData.length > 0, + trueData, + falseData, + allData: inputData, + }; + } + + // 단일 객체인 경우 const results = conditions.map((condition: any) => { const fieldValue = inputData[condition.field]; - // 🔥 비교 값 타입 확인: "field" (필드 참조) 또는 "static" (고정값) let compareValue = condition.value; if (condition.valueType === "field") { - // 필드 참조: inputData에서 해당 필드의 값을 가져옴 compareValue = inputData[condition.value]; logger.info( `🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})` @@ -2169,7 +2425,15 @@ export class NodeFlowExecutionService { logger.info(`🔍 조건 평가 결과: ${result} (${logic} 로직)`); - return result; + // ⚠️ 조건 노드는 TRUE/FALSE 브랜치를 위한 특별한 처리 필요 + // 조건 결과를 저장하고, 원본 데이터는 항상 반환 + // 다음 노드에서 sourceHandle을 기반으로 필터링됨 + return { + conditionResult: result, + trueData: result ? [inputData] : [], + falseData: result ? [] : [inputData], + allData: [inputData], // 일단 모든 데이터 전달 + }; } /** @@ -2179,17 +2443,71 @@ export class NodeFlowExecutionService { conditions: any[], data?: any, startIndex: number = 1 - ): string { + ): { clause: string; values: any[] } { if (!conditions || conditions.length === 0) { - return ""; + return { clause: "", values: [] }; } + const values: any[] = []; const clauses = conditions.map((condition, index) => { const value = data ? data[condition.field] : condition.value; - return `${condition.field} ${condition.operator} $${startIndex + index}`; + values.push(value); + + // 연산자를 SQL 문법으로 변환 + let sqlOperator = condition.operator; + switch (condition.operator.toUpperCase()) { + case "EQUALS": + sqlOperator = "="; + break; + case "NOT_EQUALS": + case "NOTEQUALS": + sqlOperator = "!="; + break; + case "GREATER_THAN": + case "GREATERTHAN": + sqlOperator = ">"; + break; + case "LESS_THAN": + case "LESSTHAN": + sqlOperator = "<"; + break; + case "GREATER_THAN_OR_EQUAL": + case "GREATERTHANOREQUAL": + sqlOperator = ">="; + break; + case "LESS_THAN_OR_EQUAL": + case "LESSTHANOREQUAL": + sqlOperator = "<="; + break; + case "LIKE": + sqlOperator = "LIKE"; + break; + case "NOT_LIKE": + case "NOTLIKE": + sqlOperator = "NOT LIKE"; + break; + case "IN": + sqlOperator = "IN"; + break; + case "NOT_IN": + case "NOTIN": + sqlOperator = "NOT IN"; + break; + case "IS_NULL": + case "ISNULL": + return `${condition.field} IS NULL`; + case "IS_NOT_NULL": + case "ISNOTNULL": + return `${condition.field} IS NOT NULL`; + default: + // 이미 SQL 문법인 경우 (=, !=, >, < 등) + sqlOperator = condition.operator; + } + + return `${condition.field} ${sqlOperator} $${startIndex + index}`; }); - return `WHERE ${clauses.join(" AND ")}`; + return { clause: `WHERE ${clauses.join(" AND ")}`, values }; } /** @@ -2200,22 +2518,85 @@ export class NodeFlowExecutionService { operator: string, expectedValue: any ): boolean { - switch (operator) { - case "equals": + // NULL 체크 + if (operator === "IS_NULL" || operator === "isNull") { + return ( + fieldValue === null || fieldValue === undefined || fieldValue === "" + ); + } + if (operator === "IS_NOT_NULL" || operator === "isNotNull") { + return ( + fieldValue !== null && fieldValue !== undefined && fieldValue !== "" + ); + } + + // 비교 연산자: 타입 변환 + const normalizedOperator = operator.toUpperCase(); + + switch (normalizedOperator) { + case "EQUALS": case "=": - return fieldValue === expectedValue; - case "notEquals": + return fieldValue == expectedValue; // 느슨한 비교 + + case "NOT_EQUALS": + case "NOTEQUALS": case "!=": - return fieldValue !== expectedValue; - case "greaterThan": + return fieldValue != expectedValue; + + case "GREATER_THAN": + case "GREATERTHAN": case ">": - return fieldValue > expectedValue; - case "lessThan": + return Number(fieldValue) > Number(expectedValue); + + case "LESS_THAN": + case "LESSTHAN": case "<": - return fieldValue < expectedValue; - case "contains": - return String(fieldValue).includes(String(expectedValue)); + return Number(fieldValue) < Number(expectedValue); + + case "GREATER_THAN_OR_EQUAL": + case "GREATERTHANOREQUAL": + case ">=": + return Number(fieldValue) >= Number(expectedValue); + + case "LESS_THAN_OR_EQUAL": + case "LESSTHANOREQUAL": + case "<=": + return Number(fieldValue) <= Number(expectedValue); + + case "LIKE": + case "CONTAINS": + return String(fieldValue) + .toLowerCase() + .includes(String(expectedValue).toLowerCase()); + + case "NOT_LIKE": + case "NOTLIKE": + return !String(fieldValue) + .toLowerCase() + .includes(String(expectedValue).toLowerCase()); + + case "IN": + if (Array.isArray(expectedValue)) { + return expectedValue.includes(fieldValue); + } + // 쉼표로 구분된 문자열 + const inValues = String(expectedValue) + .split(",") + .map((v) => v.trim()); + return inValues.includes(String(fieldValue)); + + case "NOT_IN": + case "NOTIN": + if (Array.isArray(expectedValue)) { + return !expectedValue.includes(fieldValue); + } + const notInValues = String(expectedValue) + .split(",") + .map((v) => v.trim()); + return !notInValues.includes(String(fieldValue)); + default: + logger.warn(`⚠️ 지원되지 않는 연산자: ${operator}`); return false; } } diff --git a/docs/외부_DB_연결_풀_가이드.md b/docs/외부_DB_연결_풀_가이드.md new file mode 100644 index 00000000..df4263f0 --- /dev/null +++ b/docs/외부_DB_연결_풀_가이드.md @@ -0,0 +1,491 @@ +# 외부 DB 연결 풀 관리 가이드 + +## 📋 개요 + +외부 DB 연결 풀 서비스는 여러 외부 데이터베이스와의 연결을 효율적으로 관리하여 **연결 풀 고갈을 방지**하고 성능을 최적화합니다. + +### 주요 기능 + +- ✅ **자동 연결 풀 관리**: 연결 생성, 재사용, 정리 자동화 +- ✅ **연결 풀 고갈 방지**: 최대 연결 수 제한 및 모니터링 +- ✅ **유휴 연결 정리**: 10분 이상 사용되지 않은 풀 자동 제거 +- ✅ **헬스 체크**: 1분마다 모든 풀 상태 검사 +- ✅ **다중 DB 지원**: PostgreSQL, MySQL, MariaDB +- ✅ **싱글톤 패턴**: 전역적으로 단일 인스턴스 사용 + +--- + +## 🏗️ 아키텍처 + +``` +┌─────────────────────────────────────────┐ +│ NodeFlowExecutionService │ +│ (외부 DB 소스/액션 노드) │ +└──────────────┬──────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────┐ +│ ExternalDbConnectionPoolService │ +│ (싱글톤 인스턴스) │ +│ │ +│ ┌─────────────────────────────────┐ │ +│ │ Connection Pool Map │ │ +│ │ ┌──────────────────────────┐ │ │ +│ │ │ ID: 1 → PostgresPool │ │ │ +│ │ │ ID: 2 → MySQLPool │ │ │ +│ │ │ ID: 3 → MariaDBPool │ │ │ +│ │ └──────────────────────────┘ │ │ +│ └─────────────────────────────────┘ │ +│ │ +│ - 자동 풀 생성/제거 │ +│ - 헬스 체크 (1분마다) │ +│ - 유휴 풀 정리 (10분) │ +└─────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────┐ +│ External Databases │ +│ - PostgreSQL │ +│ - MySQL │ +│ - MariaDB │ +└─────────────────────────────────────────┘ +``` + +--- + +## 🔧 연결 풀 설정 + +### PostgreSQL 연결 풀 + +```typescript +{ + max: 10, // 최대 연결 수 + min: 2, // 최소 연결 수 + idleTimeoutMillis: 30000, // 30초 유휴 시 연결 해제 + connectionTimeoutMillis: 30000, // 연결 타임아웃 30초 + statement_timeout: 60000, // 쿼리 타임아웃 60초 +} +``` + +### MySQL/MariaDB 연결 풀 + +```typescript +{ + connectionLimit: 10, // 최대 연결 수 + waitForConnections: true, + queueLimit: 0, // 대기열 무제한 + connectTimeout: 30000, // 연결 타임아웃 30초 +} +``` + +--- + +## 📊 연결 풀 라이프사이클 + +### 1. 풀 생성 + +```typescript +// 첫 요청 시 자동 생성 +const pool = await poolService.getPool(connectionId); +``` + +**생성 시점**: + +- 외부 DB 소스 노드 첫 실행 시 +- 외부 DB 액션 노드 첫 실행 시 + +**생성 과정**: + +1. DB 연결 정보 조회 (`external_db_connections` 테이블) +2. 비밀번호 복호화 +3. DB 타입에 맞는 연결 풀 생성 (PostgreSQL, MySQL, MariaDB) +4. 이벤트 리스너 등록 (연결 획득/해제 추적) + +### 2. 풀 재사용 + +```typescript +// 기존 풀이 있으면 재사용 +if (this.pools.has(connectionId)) { + const pool = this.pools.get(connectionId)!; + pool.lastUsedAt = new Date(); // 사용 시간 갱신 + return pool; +} +``` + +**재사용 조건**: + +- 동일한 `connectionId`로 요청 +- 풀이 정상 상태 (`isHealthy()` 통과) + +### 3. 자동 정리 + +**유휴 시간 초과 (10분)**: + +```typescript +const IDLE_TIMEOUT = 10 * 60 * 1000; // 10분 + +if (now - pool.lastUsedAt.getTime() > IDLE_TIMEOUT) { + await this.removePool(connectionId); +} +``` + +**헬스 체크 실패**: + +```typescript +if (!pool.isHealthy()) { + await this.removePool(connectionId); +} +``` + +--- + +## 🔍 헬스 체크 시스템 + +### 주기적 헬스 체크 + +```typescript +const HEALTH_CHECK_INTERVAL = 60 * 1000; // 1분마다 + +setInterval(() => { + this.pools.forEach(async (pool, connectionId) => { + // 유휴 시간 체크 + const idleTime = now - pool.lastUsedAt.getTime(); + if (idleTime > IDLE_TIMEOUT) { + await this.removePool(connectionId); + } + + // 헬스 체크 + if (!pool.isHealthy()) { + await this.removePool(connectionId); + } + }); +}, HEALTH_CHECK_INTERVAL); +``` + +### 헬스 체크 조건 + +#### PostgreSQL + +```typescript +isHealthy(): boolean { + return this.pool.totalCount > 0 + && this.activeConnections < this.maxConnections; +} +``` + +#### MySQL/MariaDB + +```typescript +isHealthy(): boolean { + return this.activeConnections < this.maxConnections; +} +``` + +--- + +## 💻 사용 방법 + +### 1. 외부 DB 소스 노드에서 사용 + +```typescript +// nodeFlowExecutionService.ts +private static async executeExternalDBSource( + node: FlowNode, + context: ExecutionContext +): Promise { + const { connectionId, tableName } = node.data; + + // 연결 풀 서비스 사용 + const { ExternalDbConnectionPoolService } = await import( + "./externalDbConnectionPoolService" + ); + const poolService = ExternalDbConnectionPoolService.getInstance(); + + const sql = `SELECT * FROM ${tableName}`; + const result = await poolService.executeQuery(connectionId, sql); + + return result; +} +``` + +### 2. 외부 DB 액션 노드에서 사용 + +```typescript +// 기존 createExternalConnector가 자동으로 연결 풀 사용 +const connector = await this.createExternalConnector(connectionId, dbType); + +// executeQuery 호출 시 내부적으로 연결 풀 사용 +const result = await connector.executeQuery(sql, params); +``` + +### 3. 연결 풀 상태 조회 + +**API 엔드포인트**: + +``` +GET /api/external-db-connections/pool-status +``` + +**응답 예시**: + +```json +{ + "success": true, + "data": { + "totalPools": 3, + "activePools": 2, + "pools": [ + { + "connectionId": 1, + "dbType": "postgresql", + "activeConnections": 2, + "maxConnections": 10, + "createdAt": "2025-01-13T10:00:00.000Z", + "lastUsedAt": "2025-01-13T10:05:00.000Z", + "idleSeconds": 45 + }, + { + "connectionId": 2, + "dbType": "mysql", + "activeConnections": 0, + "maxConnections": 10, + "createdAt": "2025-01-13T09:50:00.000Z", + "lastUsedAt": "2025-01-13T09:55:00.000Z", + "idleSeconds": 600 + } + ] + }, + "message": "3개의 연결 풀 상태를 조회했습니다." +} +``` + +--- + +## 🚨 연결 풀 고갈 방지 메커니즘 + +### 1. 최대 연결 수 제한 + +```typescript +// 데이터베이스 설정 기준 +max_connections: config.max_connections || 10; +``` + +각 외부 DB 연결마다 최대 연결 수를 설정하여 무제한 연결 방지. + +### 2. 연결 재사용 + +```typescript +// 동일한 connectionId 요청 시 기존 풀 재사용 +const pool = await poolService.getPool(connectionId); +``` + +매번 새 연결을 생성하지 않고 기존 풀 재사용. + +### 3. 자동 연결 해제 + +```typescript +// PostgreSQL: 30초 유휴 시 자동 해제 +idleTimeoutMillis: 30000; +``` + +사용되지 않는 연결은 자동으로 해제하여 리소스 절약. + +### 4. 전역 풀 정리 + +```typescript +// 10분 이상 미사용 풀 제거 +if (idleTime > IDLE_TIMEOUT) { + await this.removePool(connectionId); +} +``` + +장시간 사용되지 않는 풀 자체를 제거. + +### 5. 애플리케이션 종료 시 정리 + +```typescript +process.on("SIGINT", async () => { + await ExternalDbConnectionPoolService.getInstance().closeAll(); + process.exit(0); +}); +``` + +프로세스 종료 시 모든 연결 정상 종료. + +--- + +## 📈 모니터링 및 로깅 + +### 연결 이벤트 로깅 + +```typescript +// 연결 획득 +pool.on("acquire", () => { + logger.debug(`[PostgreSQL] 연결 획득 (2/10)`); +}); + +// 연결 반환 +pool.on("release", () => { + logger.debug(`[PostgreSQL] 연결 반환 (1/10)`); +}); + +// 에러 발생 +pool.on("error", (err) => { + logger.error(`[PostgreSQL] 연결 풀 오류:`, err); +}); +``` + +### 정기 상태 로깅 + +```typescript +// 1분마다 상태 출력 +logger.debug(`📊 연결 풀 상태: 총 3개, 활성: 2개`); +``` + +### 주요 로그 메시지 + +| 레벨 | 메시지 | 의미 | +| ------- | ---------------------------------------------------------- | --------------- | +| `info` | `🔧 새 연결 풀 생성 중 (ID: 1)...` | 새 풀 생성 시작 | +| `info` | `✅ 연결 풀 생성 완료 (ID: 1, 타입: postgresql, 최대: 10)` | 풀 생성 완료 | +| `debug` | `✅ 기존 연결 풀 재사용 (ID: 1)` | 기존 풀 재사용 | +| `info` | `🧹 유휴 연결 풀 정리 (ID: 2, 유휴: 620초)` | 유휴 풀 제거 | +| `warn` | `⚠️ 연결 풀 비정상 감지 (ID: 3), 재생성 중...` | 헬스 체크 실패 | +| `error` | `❌ 쿼리 실행 실패 (ID: 1)` | 쿼리 오류 | + +--- + +## 🔒 보안 고려사항 + +### 1. 비밀번호 보호 + +```typescript +// 비밀번호 복호화는 풀 생성 시에만 수행 +config.password = PasswordEncryption.decrypt(config.password); +``` + +메모리에 평문 비밀번호를 최소한으로 유지. + +### 2. 연결 정보 검증 + +```typescript +if (config.is_active !== "Y") { + throw new Error(`비활성화된 연결입니다 (ID: ${connectionId})`); +} +``` + +비활성화된 연결은 사용 불가. + +### 3. 타임아웃 설정 + +```typescript +connectionTimeoutMillis: 30000, // 30초 +statement_timeout: 60000, // 60초 +``` + +무한 대기 방지. + +--- + +## 🐛 트러블슈팅 + +### 문제 1: 연결 풀 고갈 + +**증상**: "Connection pool exhausted" 오류 + +**원인**: + +- 동시 요청이 최대 연결 수 초과 +- 쿼리가 너무 오래 실행되어 연결 점유 + +**해결**: + +1. `max_connections` 값 증가 (`external_db_connections` 테이블) +2. 쿼리 최적화 (인덱스, LIMIT 추가) +3. `query_timeout` 값 조정 + +### 문제 2: 메모리 누수 + +**증상**: 메모리 사용량 지속 증가 + +**원인**: + +- 연결 풀이 정리되지 않음 +- 헬스 체크 실패 + +**해결**: + +1. 연결 풀 상태 확인: `GET /api/external-db-connections/pool-status` +2. 수동 재시작으로 모든 풀 정리 +3. 로그에서 `🧹 유휴 연결 풀 정리` 메시지 확인 + +### 문제 3: 연결 시간 초과 + +**증상**: "Connection timeout" 오류 + +**원인**: + +- DB 서버 응답 없음 +- 네트워크 문제 +- 방화벽 차단 + +**해결**: + +1. DB 서버 상태 확인 +2. 네트워크 연결 확인 +3. `connection_timeout` 값 증가 + +--- + +## ⚙️ 설정 권장사항 + +### 소규모 시스템 (동시 사용자 < 50) + +```typescript +{ + max_connections: 5, + connection_timeout: 30, + query_timeout: 60, +} +``` + +### 중규모 시스템 (동시 사용자 50-200) + +```typescript +{ + max_connections: 10, + connection_timeout: 30, + query_timeout: 90, +} +``` + +### 대규모 시스템 (동시 사용자 > 200) + +```typescript +{ + max_connections: 20, + connection_timeout: 60, + query_timeout: 120, +} +``` + +--- + +## 📚 참고 자료 + +- [PostgreSQL Connection Pooling](https://node-postgres.com/features/pooling) +- [MySQL Connection Pool](https://github.com/mysqljs/mysql#pooling-connections) +- [Node.js Best Practices - Database Connection Management](https://github.com/goldbergyoni/nodebestpractices) + +--- + +## 🎯 결론 + +외부 DB 연결 풀 서비스는 다음을 보장합니다: + +✅ **효율성**: 연결 재사용으로 성능 향상 +✅ **안정성**: 연결 풀 고갈 방지 +✅ **자동화**: 생성/정리/모니터링 자동화 +✅ **확장성**: 다중 DB 및 대규모 트래픽 지원 + +**최소한의 설정**으로 **최대한의 안정성**을 제공합니다! 🚀 diff --git a/frontend/app/(main)/admin/dataflow/node-editor/page.tsx b/frontend/app/(main)/admin/dataflow/node-editor/page.tsx index 6fdb0da7..a8ba0d66 100644 --- a/frontend/app/(main)/admin/dataflow/node-editor/page.tsx +++ b/frontend/app/(main)/admin/dataflow/node-editor/page.tsx @@ -1,7 +1,7 @@ "use client"; /** - * 노드 기반 제어 시스템 페이지 + * 제어 시스템 페이지 */ import { FlowEditor } from "@/components/dataflow/node-editor/FlowEditor"; @@ -12,7 +12,7 @@ export default function NodeEditorPage() { {/* 페이지 헤더 */}
-

노드 기반 제어 시스템

+

제어 시스템

드래그 앤 드롭으로 데이터 제어 플로우를 시각적으로 설계하고 관리합니다

diff --git a/frontend/components/dataflow/node-editor/FlowEditor.tsx b/frontend/components/dataflow/node-editor/FlowEditor.tsx index 43da1205..279f64d4 100644 --- a/frontend/components/dataflow/node-editor/FlowEditor.tsx +++ b/frontend/components/dataflow/node-editor/FlowEditor.tsx @@ -16,7 +16,6 @@ import { TableSourceNode } from "./nodes/TableSourceNode"; import { ExternalDBSourceNode } from "./nodes/ExternalDBSourceNode"; import { ReferenceLookupNode } from "./nodes/ReferenceLookupNode"; import { ConditionNode } from "./nodes/ConditionNode"; -import { FieldMappingNode } from "./nodes/FieldMappingNode"; import { InsertActionNode } from "./nodes/InsertActionNode"; import { UpdateActionNode } from "./nodes/UpdateActionNode"; import { DeleteActionNode } from "./nodes/DeleteActionNode"; @@ -35,7 +34,6 @@ const nodeTypes = { referenceLookup: ReferenceLookupNode, // 변환/조건 condition: ConditionNode, - fieldMapping: FieldMappingNode, dataTransform: DataTransformNode, // 액션 insertAction: InsertActionNode, diff --git a/frontend/components/dataflow/node-editor/nodes/FieldMappingNode.tsx b/frontend/components/dataflow/node-editor/nodes/FieldMappingNode.tsx deleted file mode 100644 index 2101aa76..00000000 --- a/frontend/components/dataflow/node-editor/nodes/FieldMappingNode.tsx +++ /dev/null @@ -1,66 +0,0 @@ -"use client"; - -/** - * 필드 매핑 노드 - */ - -import { memo } from "react"; -import { Handle, Position, NodeProps } from "reactflow"; -import { ArrowLeftRight } from "lucide-react"; -import type { FieldMappingNodeData } from "@/types/node-editor"; - -export const FieldMappingNode = memo(({ data, selected }: NodeProps) => { - return ( -
- {/* 입력 핸들 */} - - - {/* 헤더 */} -
- -
-
필드 매핑
-
{data.displayName || "데이터 매핑"}
-
-
- - {/* 본문 */} -
- {data.mappings && data.mappings.length > 0 ? ( -
-
매핑 규칙: ({data.mappings.length}개)
-
- {data.mappings.slice(0, 5).map((mapping) => ( -
-
- {mapping.sourceField || "정적값"} - - {mapping.targetField} -
- {mapping.transform &&
변환: {mapping.transform}
} - {mapping.staticValue !== undefined && ( -
값: {String(mapping.staticValue)}
- )} -
- ))} - {data.mappings.length > 5 && ( -
... 외 {data.mappings.length - 5}개
- )} -
-
- ) : ( -
매핑 규칙 없음
- )} -
- - {/* 출력 핸들 */} - -
- ); -}); - -FieldMappingNode.displayName = "FieldMappingNode"; diff --git a/frontend/components/dataflow/node-editor/panels/PropertiesPanel.tsx b/frontend/components/dataflow/node-editor/panels/PropertiesPanel.tsx index 76eb935a..bd7f8e87 100644 --- a/frontend/components/dataflow/node-editor/panels/PropertiesPanel.tsx +++ b/frontend/components/dataflow/node-editor/panels/PropertiesPanel.tsx @@ -10,7 +10,6 @@ import { useFlowEditorStore } from "@/lib/stores/flowEditorStore"; import { TableSourceProperties } from "./properties/TableSourceProperties"; import { ReferenceLookupProperties } from "./properties/ReferenceLookupProperties"; import { InsertActionProperties } from "./properties/InsertActionProperties"; -import { FieldMappingProperties } from "./properties/FieldMappingProperties"; import { ConditionProperties } from "./properties/ConditionProperties"; import { UpdateActionProperties } from "./properties/UpdateActionProperties"; import { DeleteActionProperties } from "./properties/DeleteActionProperties"; @@ -84,9 +83,6 @@ function NodePropertiesRenderer({ node }: { node: any }) { case "insertAction": return ; - case "fieldMapping": - return ; - case "condition": return ; diff --git a/frontend/components/dataflow/node-editor/panels/properties/ConditionProperties.tsx b/frontend/components/dataflow/node-editor/panels/properties/ConditionProperties.tsx index 4bde32fa..6e80c927 100644 --- a/frontend/components/dataflow/node-editor/panels/properties/ConditionProperties.tsx +++ b/frontend/components/dataflow/node-editor/panels/properties/ConditionProperties.tsx @@ -122,6 +122,24 @@ export function ConditionProperties({ nodeId, data }: ConditionPropertiesProps) } else { fields.push(...upperFields); } + } else if (sourceNode.type === "restAPISource") { + // REST API Source: responseFields 사용 + if (sourceData.responseFields && Array.isArray(sourceData.responseFields)) { + console.log("🔍 [ConditionProperties] REST API 필드:", sourceData.responseFields); + fields.push( + ...sourceData.responseFields.map((f: any) => ({ + name: f.name || f.fieldName, + label: f.label || f.displayName || f.name, + type: f.dataType || f.type, + })), + ); + } else { + console.log("⚠️ [ConditionProperties] REST API에 필드 없음:", sourceData); + } + } else if (sourceNode.type === "condition") { + // 조건 노드: 재귀적으로 상위 노드 필드 수집 (통과 노드) + console.log("✅ [ConditionProperties] 조건 노드 통과 → 상위 탐색"); + fields.push(...getAllSourceFields(sourceNode.id, visited)); } else if ( sourceNode.type === "insertAction" || sourceNode.type === "updateAction" || @@ -130,6 +148,10 @@ export function ConditionProperties({ nodeId, data }: ConditionPropertiesProps) ) { // Action 노드: 재귀적으로 상위 노드 필드 수집 fields.push(...getAllSourceFields(sourceNode.id, visited)); + } else { + // 기타 모든 노드: 재귀적으로 상위 노드 필드 수집 (통과 노드로 처리) + console.log(`✅ [ConditionProperties] 통과 노드 (${sourceNode.type}) → 상위 탐색`); + fields.push(...getAllSourceFields(sourceNode.id, visited)); } } diff --git a/frontend/components/dataflow/node-editor/panels/properties/FieldMappingProperties.tsx b/frontend/components/dataflow/node-editor/panels/properties/FieldMappingProperties.tsx deleted file mode 100644 index 79bdffb8..00000000 --- a/frontend/components/dataflow/node-editor/panels/properties/FieldMappingProperties.tsx +++ /dev/null @@ -1,191 +0,0 @@ -"use client"; - -/** - * 필드 매핑 노드 속성 편집 - */ - -import { useEffect, useState } from "react"; -import { Plus, Trash2, ArrowRight } from "lucide-react"; -import { Label } from "@/components/ui/label"; -import { Input } from "@/components/ui/input"; -import { Button } from "@/components/ui/button"; -import { ScrollArea } from "@/components/ui/scroll-area"; -import { useFlowEditorStore } from "@/lib/stores/flowEditorStore"; -import type { FieldMappingNodeData } from "@/types/node-editor"; - -interface FieldMappingPropertiesProps { - nodeId: string; - data: FieldMappingNodeData; -} - -export function FieldMappingProperties({ nodeId, data }: FieldMappingPropertiesProps) { - const { updateNode } = useFlowEditorStore(); - - const [displayName, setDisplayName] = useState(data.displayName || "데이터 매핑"); - const [mappings, setMappings] = useState(data.mappings || []); - - // 데이터 변경 시 로컬 상태 업데이트 - useEffect(() => { - setDisplayName(data.displayName || "데이터 매핑"); - setMappings(data.mappings || []); - }, [data]); - - const handleAddMapping = () => { - setMappings([ - ...mappings, - { - id: `mapping_${Date.now()}`, - sourceField: "", - targetField: "", - transform: undefined, - staticValue: undefined, - }, - ]); - }; - - const handleRemoveMapping = (id: string) => { - setMappings(mappings.filter((m) => m.id !== id)); - }; - - const handleMappingChange = (id: string, field: string, value: any) => { - const newMappings = mappings.map((m) => (m.id === id ? { ...m, [field]: value } : m)); - setMappings(newMappings); - }; - - const handleSave = () => { - updateNode(nodeId, { - displayName, - mappings, - }); - }; - - return ( - -
- {/* 기본 정보 */} -
-

기본 정보

- -
- - setDisplayName(e.target.value)} - className="mt-1" - placeholder="노드 표시 이름" - /> -
-
- - {/* 매핑 규칙 */} -
-
-

매핑 규칙

- -
- - {mappings.length > 0 ? ( -
- {mappings.map((mapping, index) => ( -
-
- 규칙 #{index + 1} - -
- -
- {/* 소스 → 타겟 */} -
-
- - handleMappingChange(mapping.id, "sourceField", e.target.value)} - placeholder="입력 필드" - className="mt-1 h-8 text-xs" - /> -
- -
- -
- -
- - handleMappingChange(mapping.id, "targetField", e.target.value)} - placeholder="출력 필드" - className="mt-1 h-8 text-xs" - /> -
-
- - {/* 변환 함수 */} -
- - handleMappingChange(mapping.id, "transform", e.target.value)} - placeholder="예: UPPER(), TRIM(), CONCAT()" - className="mt-1 h-8 text-xs" - /> -
- - {/* 정적 값 */} -
- - handleMappingChange(mapping.id, "staticValue", e.target.value)} - placeholder="고정 값 (소스 필드 대신 사용)" - className="mt-1 h-8 text-xs" - /> -
-
-
- ))} -
- ) : ( -
- 매핑 규칙이 없습니다. "추가" 버튼을 클릭하세요. -
- )} -
- - {/* 저장 버튼 */} -
- -
- - {/* 안내 */} -
-
- 💡 소스 필드: 입력 데이터의 필드명 -
-
- 💡 타겟 필드: 출력 데이터의 필드명 -
-
- 💡 변환 함수: 데이터 변환 로직 (SQL 함수 형식) -
-
-
-
- ); -} diff --git a/frontend/components/dataflow/node-editor/panels/properties/InsertActionProperties.tsx b/frontend/components/dataflow/node-editor/panels/properties/InsertActionProperties.tsx index 0ee91d06..84a55270 100644 --- a/frontend/components/dataflow/node-editor/panels/properties/InsertActionProperties.tsx +++ b/frontend/components/dataflow/node-editor/panels/properties/InsertActionProperties.tsx @@ -137,8 +137,10 @@ export function InsertActionProperties({ nodeId, data }: InsertActionPropertiesP const getAllSourceFields = ( targetNodeId: string, visitedNodes = new Set(), - ): { fields: Array<{ name: string; label?: string }>; hasRestAPI: boolean } => { + sourcePath: string[] = [], // 🔥 소스 경로 추적 + ): { fields: Array<{ name: string; label?: string; sourcePath?: string[] }>; hasRestAPI: boolean } => { if (visitedNodes.has(targetNodeId)) { + console.log(`⚠️ 순환 참조 감지: ${targetNodeId} (이미 방문함)`); return { fields: [], hasRestAPI: false }; } visitedNodes.add(targetNodeId); @@ -147,19 +149,27 @@ export function InsertActionProperties({ nodeId, data }: InsertActionPropertiesP const sourceNodeIds = inputEdges.map((edge) => edge.source); const sourceNodes = nodes.filter((node) => sourceNodeIds.includes(node.id)); - const fields: Array<{ name: string; label?: string }> = []; + // 🔥 다중 소스 감지 + if (sourceNodes.length > 1) { + console.log(`⚠️ 다중 소스 감지: ${sourceNodes.length}개 노드 연결됨`); + console.log(" 소스 노드들:", sourceNodes.map((n) => `${n.id}(${n.type})`).join(", ")); + } + + const fields: Array<{ name: string; label?: string; sourcePath?: string[] }> = []; let foundRestAPI = false; sourceNodes.forEach((node) => { console.log(`🔍 노드 ${node.id} 타입: ${node.type}`); - console.log(`🔍 노드 ${node.id} 데이터:`, node.data); - // 데이터 변환 노드인 경우: 변환된 필드 + 상위 노드의 원본 필드 + // 🔥 현재 노드를 경로에 추가 + const currentPath = [...sourcePath, `${node.id}(${node.type})`]; + + // 1️⃣ 데이터 변환 노드: 변환된 필드 + 상위 노드의 원본 필드 if (node.type === "dataTransform") { console.log("✅ 데이터 변환 노드 발견"); // 상위 노드의 원본 필드 먼저 수집 - const upperResult = getAllSourceFields(node.id, visitedNodes); + const upperResult = getAllSourceFields(node.id, visitedNodes, currentPath); const upperFields = upperResult.fields; foundRestAPI = foundRestAPI || upperResult.hasRestAPI; console.log(` 📤 상위 노드에서 ${upperFields.length}개 필드 가져옴`); @@ -167,7 +177,7 @@ export function InsertActionProperties({ nodeId, data }: InsertActionPropertiesP // 변환된 필드 추가 (in-place 변환 고려) if ((node.data as any).transformations && Array.isArray((node.data as any).transformations)) { console.log(` 📊 ${(node.data as any).transformations.length}개 변환 발견`); - const inPlaceFields = new Set(); // in-place 변환된 필드 추적 + const inPlaceFields = new Set(); (node.data as any).transformations.forEach((transform: any) => { const targetField = transform.targetField || transform.sourceField; @@ -176,32 +186,29 @@ export function InsertActionProperties({ nodeId, data }: InsertActionPropertiesP console.log(` 🔹 변환: ${transform.sourceField} → ${targetField} ${isInPlace ? "(in-place)" : ""}`); if (isInPlace) { - // in-place: 원본 필드를 덮어쓰므로, 원본 필드는 이미 upperFields에 있음 inPlaceFields.add(transform.sourceField); } else if (targetField) { - // 새 필드 생성 fields.push({ name: targetField, label: transform.targetFieldLabel || targetField, + sourcePath: currentPath, }); } }); - // 상위 필드 중 in-place 변환되지 않은 것만 추가 + // 상위 필드 추가 upperFields.forEach((field) => { if (!inPlaceFields.has(field.name)) { fields.push(field); } else { - // in-place 변환된 필드도 추가 (변환 후 값) fields.push(field); } }); } else { - // 변환이 없으면 상위 필드만 추가 fields.push(...upperFields); } } - // REST API 소스 노드인 경우 + // 2️⃣ REST API 소스 노드 else if (node.type === "restAPISource") { console.log("✅ REST API 소스 노드 발견"); foundRestAPI = true; @@ -216,6 +223,7 @@ export function InsertActionProperties({ nodeId, data }: InsertActionPropertiesP fields.push({ name: fieldName, label: fieldLabel, + sourcePath: currentPath, }); } }); @@ -223,26 +231,44 @@ export function InsertActionProperties({ nodeId, data }: InsertActionPropertiesP console.log("⚠️ REST API 노드에 responseFields 없음"); } } - // 일반 소스 노드인 경우 (테이블 소스 등) - else { + // 3️⃣ 테이블/외부DB 소스 노드 + else if (node.type === "tableSource" || node.type === "externalDBSource") { const nodeFields = (node.data as any).fields || (node.data as any).outputFields; + const displayName = (node.data as any).displayName || (node.data as any).tableName || node.id; if (nodeFields && Array.isArray(nodeFields)) { - console.log(`✅ 노드 ${node.id}에서 ${nodeFields.length}개 필드 발견`); + console.log(`✅ ${node.type}[${displayName}] 노드에서 ${nodeFields.length}개 필드 발견`); nodeFields.forEach((field: any) => { const fieldName = field.name || field.fieldName || field.column_name; const fieldLabel = field.label || field.displayName || field.label_ko; if (fieldName) { + // 🔥 다중 소스인 경우 필드명에 소스 표시 + const displayLabel = + sourceNodes.length > 1 ? `${fieldLabel || fieldName} [${displayName}]` : fieldLabel || fieldName; + fields.push({ name: fieldName, - label: fieldLabel, + label: displayLabel, + sourcePath: currentPath, }); } }); } else { - console.log(`❌ 노드 ${node.id}에 fields 없음`); + console.log(`⚠️ ${node.type} 노드에 필드 정의 없음 → 상위 노드 탐색`); + // 필드가 없으면 상위 노드로 계속 탐색 + const upperResult = getAllSourceFields(node.id, visitedNodes, currentPath); + fields.push(...upperResult.fields); + foundRestAPI = foundRestAPI || upperResult.hasRestAPI; } } + // 4️⃣ 통과 노드 (조건, 기타 모든 노드): 상위 노드로 계속 탐색 + else { + console.log(`✅ 통과 노드 (${node.type}) → 상위 노드로 계속 탐색`); + const upperResult = getAllSourceFields(node.id, visitedNodes, currentPath); + fields.push(...upperResult.fields); + foundRestAPI = foundRestAPI || upperResult.hasRestAPI; + console.log(` 📤 상위 노드에서 ${upperResult.fields.length}개 필드 가져옴`); + } }); return { fields, hasRestAPI: foundRestAPI }; @@ -251,8 +277,30 @@ export function InsertActionProperties({ nodeId, data }: InsertActionPropertiesP console.log("🔍 INSERT 노드 ID:", nodeId); const result = getAllSourceFields(nodeId); - // 중복 제거 - const uniqueFields = Array.from(new Map(result.fields.map((field) => [field.name, field])).values()); + console.log("📊 필드 수집 완료:"); + console.log(` - 총 필드 수: ${result.fields.length}개`); + console.log(` - REST API 포함: ${result.hasRestAPI}`); + + // 🔥 중복 제거 개선: 필드명이 같아도 소스가 다르면 모두 표시 + const fieldMap = new Map(); + const duplicateFields = new Set(); + + result.fields.forEach((field) => { + const key = `${field.name}`; + if (fieldMap.has(key)) { + duplicateFields.add(field.name); + } + // 중복이면 마지막 값으로 덮어씀 (기존 동작 유지) + fieldMap.set(key, field); + }); + + if (duplicateFields.size > 0) { + console.warn(`⚠️ 중복 필드명 감지: ${Array.from(duplicateFields).join(", ")}`); + console.warn(" → 마지막으로 발견된 필드만 표시됩니다."); + console.warn(" → 다중 소스 사용 시 필드명이 겹치지 않도록 주의하세요!"); + } + + const uniqueFields = Array.from(fieldMap.values()); setSourceFields(uniqueFields); setHasRestAPISource(result.hasRestAPI); diff --git a/frontend/components/dataflow/node-editor/panels/properties/UpdateActionProperties.tsx b/frontend/components/dataflow/node-editor/panels/properties/UpdateActionProperties.tsx index 863725ff..05c40fa4 100644 --- a/frontend/components/dataflow/node-editor/panels/properties/UpdateActionProperties.tsx +++ b/frontend/components/dataflow/node-editor/panels/properties/UpdateActionProperties.tsx @@ -166,14 +166,12 @@ export function UpdateActionProperties({ nodeId, data }: UpdateActionPropertiesP let foundRestAPI = false; sourceNodes.forEach((node) => { - // 데이터 변환 노드인 경우: 변환된 필드 + 상위 노드의 원본 필드 + // 1️⃣ 데이터 변환 노드: 변환된 필드 + 상위 노드의 원본 필드 if (node.type === "dataTransform") { - // 상위 노드의 원본 필드 먼저 수집 const upperResult = getAllSourceFields(node.id, visitedNodes); const upperFields = upperResult.fields; foundRestAPI = foundRestAPI || upperResult.hasRestAPI; - // 변환된 필드 추가 (in-place 변환 고려) if ((node.data as any).transformations && Array.isArray((node.data as any).transformations)) { const inPlaceFields = new Set(); @@ -191,7 +189,6 @@ export function UpdateActionProperties({ nodeId, data }: UpdateActionPropertiesP } }); - // 상위 필드 추가 (모두 포함, in-place는 변환 후 값) upperFields.forEach((field) => { fields.push(field); }); @@ -199,7 +196,7 @@ export function UpdateActionProperties({ nodeId, data }: UpdateActionPropertiesP fields.push(...upperFields); } } - // REST API 소스 노드인 경우 + // 2️⃣ REST API 소스 노드 else if (node.type === "restAPISource") { foundRestAPI = true; const responseFields = (node.data as any).responseFields; @@ -216,21 +213,33 @@ export function UpdateActionProperties({ nodeId, data }: UpdateActionPropertiesP }); } } - // 일반 소스 노드인 경우 - else if (node.type === "tableSource" && (node.data as any).fields) { - (node.data as any).fields.forEach((field: any) => { - fields.push({ - name: field.name, - label: field.label || field.displayName, + // 3️⃣ 테이블/외부DB 소스 노드 + else if (node.type === "tableSource" || node.type === "externalDBSource") { + const nodeFields = (node.data as any).fields || (node.data as any).outputFields; + + if (nodeFields && Array.isArray(nodeFields)) { + nodeFields.forEach((field: any) => { + const fieldName = field.name || field.fieldName || field.column_name; + const fieldLabel = field.label || field.displayName || field.label_ko; + if (fieldName) { + fields.push({ + name: fieldName, + label: fieldLabel, + }); + } }); - }); - } else if (node.type === "externalDBSource" && (node.data as any).fields) { - (node.data as any).fields.forEach((field: any) => { - fields.push({ - name: field.name, - label: field.label || field.displayName, - }); - }); + } else { + // 필드가 없으면 상위 노드로 계속 탐색 + const upperResult = getAllSourceFields(node.id, visitedNodes); + fields.push(...upperResult.fields); + foundRestAPI = foundRestAPI || upperResult.hasRestAPI; + } + } + // 4️⃣ 통과 노드 (조건, 기타 모든 노드): 상위 노드로 계속 탐색 + else { + const upperResult = getAllSourceFields(node.id, visitedNodes); + fields.push(...upperResult.fields); + foundRestAPI = foundRestAPI || upperResult.hasRestAPI; } }); diff --git a/frontend/components/dataflow/node-editor/panels/properties/UpsertActionProperties.tsx b/frontend/components/dataflow/node-editor/panels/properties/UpsertActionProperties.tsx index 03aa29f4..3e27d910 100644 --- a/frontend/components/dataflow/node-editor/panels/properties/UpsertActionProperties.tsx +++ b/frontend/components/dataflow/node-editor/panels/properties/UpsertActionProperties.tsx @@ -153,14 +153,12 @@ export function UpsertActionProperties({ nodeId, data }: UpsertActionPropertiesP let foundRestAPI = false; sourceNodes.forEach((node) => { - // 데이터 변환 노드인 경우: 변환된 필드 + 상위 노드의 원본 필드 + // 1️⃣ 데이터 변환 노드: 변환된 필드 + 상위 노드의 원본 필드 if (node.type === "dataTransform") { - // 상위 노드의 원본 필드 먼저 수집 const upperResult = getAllSourceFields(node.id, visitedNodes); const upperFields = upperResult.fields; foundRestAPI = foundRestAPI || upperResult.hasRestAPI; - // 변환된 필드 추가 (in-place 변환 고려) if ((node.data as any).transformations && Array.isArray((node.data as any).transformations)) { const inPlaceFields = new Set(); @@ -178,7 +176,6 @@ export function UpsertActionProperties({ nodeId, data }: UpsertActionPropertiesP } }); - // 상위 필드 추가 (모두 포함, in-place는 변환 후 값) upperFields.forEach((field) => { fields.push(field); }); @@ -186,7 +183,7 @@ export function UpsertActionProperties({ nodeId, data }: UpsertActionPropertiesP fields.push(...upperFields); } } - // REST API 소스 노드인 경우 + // 2️⃣ REST API 소스 노드 else if (node.type === "restAPISource") { foundRestAPI = true; const responseFields = (node.data as any).responseFields; @@ -203,21 +200,33 @@ export function UpsertActionProperties({ nodeId, data }: UpsertActionPropertiesP }); } } - // 일반 소스 노드인 경우 - else if (node.type === "tableSource" && (node.data as any).fields) { - (node.data as any).fields.forEach((field: any) => { - fields.push({ - name: field.name, - label: field.label || field.displayName, + // 3️⃣ 테이블/외부DB 소스 노드 + else if (node.type === "tableSource" || node.type === "externalDBSource") { + const nodeFields = (node.data as any).fields || (node.data as any).outputFields; + + if (nodeFields && Array.isArray(nodeFields)) { + nodeFields.forEach((field: any) => { + const fieldName = field.name || field.fieldName || field.column_name; + const fieldLabel = field.label || field.displayName || field.label_ko; + if (fieldName) { + fields.push({ + name: fieldName, + label: fieldLabel, + }); + } }); - }); - } else if (node.type === "externalDBSource" && (node.data as any).fields) { - (node.data as any).fields.forEach((field: any) => { - fields.push({ - name: field.name, - label: field.label || field.displayName, - }); - }); + } else { + // 필드가 없으면 상위 노드로 계속 탐색 + const upperResult = getAllSourceFields(node.id, visitedNodes); + fields.push(...upperResult.fields); + foundRestAPI = foundRestAPI || upperResult.hasRestAPI; + } + } + // 4️⃣ 통과 노드 (조건, 기타 모든 노드): 상위 노드로 계속 탐색 + else { + const upperResult = getAllSourceFields(node.id, visitedNodes); + fields.push(...upperResult.fields); + foundRestAPI = foundRestAPI || upperResult.hasRestAPI; } }); diff --git a/frontend/components/dataflow/node-editor/sidebar/nodePaletteConfig.ts b/frontend/components/dataflow/node-editor/sidebar/nodePaletteConfig.ts index 176ade1f..4369e0fc 100644 --- a/frontend/components/dataflow/node-editor/sidebar/nodePaletteConfig.ts +++ b/frontend/components/dataflow/node-editor/sidebar/nodePaletteConfig.ts @@ -52,14 +52,6 @@ export const NODE_PALETTE: NodePaletteItem[] = [ category: "transform", color: "#EAB308", // 노란색 }, - { - type: "fieldMapping", - label: "필드 매핑", - icon: "🔀", - description: "소스 필드를 타겟 필드로 매핑합니다", - category: "transform", - color: "#8B5CF6", // 보라색 - }, { type: "dataTransform", label: "데이터 변환", diff --git a/frontend/types/node-editor.ts b/frontend/types/node-editor.ts index d29f006b..8959a691 100644 --- a/frontend/types/node-editor.ts +++ b/frontend/types/node-editor.ts @@ -14,7 +14,6 @@ export type NodeType = | "restAPISource" // REST API 소스 | "referenceLookup" // 참조 테이블 조회 (내부 DB 전용) | "condition" // 조건 분기 - | "fieldMapping" // 필드 매핑 | "dataTransform" // 데이터 변환 | "insertAction" // INSERT 액션 | "updateAction" // UPDATE 액션