/** * 노드 플로우 실행 엔진 * * 기능: * - 위상 정렬 (Topological Sort) * - 레벨별 병렬 실행 (Promise.allSettled) * - 독립 트랜잭션 처리 * - 연쇄 중단 (부모 실패 시 자식 스킵) */ import { query, queryOne, transaction } from "../database/db"; import { logger } from "../utils/logger"; // ===== 타입 정의 ===== export interface FlowNode { id: string; type: NodeType; position?: { x: number; y: number }; data: NodeData; } export type NodeType = | "tableSource" | "externalDBSource" | "restAPISource" | "condition" | "fieldMapping" | "dataTransform" | "insertAction" | "updateAction" | "deleteAction" | "upsertAction" | "comment" | "log"; export interface NodeData { displayName?: string; [key: string]: any; } export interface FlowEdge { id: string; source: string; target: string; sourceHandle?: string; targetHandle?: string; } export interface ExecutionContext { sourceData?: any[]; // 외부에서 주입된 데이터 (선택된 행 또는 폼 데이터) dataSourceType?: string; // "table-selection" | "form" | "none" nodeResults: Map; executionOrder: string[]; buttonContext?: ButtonContext; } export interface ButtonContext { buttonId: string; screenId?: number; companyCode?: string; userId?: string; formData?: Record; selectedRowsData?: Record[]; } export interface NodeResult { nodeId: string; status: "pending" | "success" | "failed" | "skipped"; data?: any; error?: Error; startTime: number; endTime?: number; } export interface ExecutionResult { success: boolean; message: string; executionTime: number; nodes: NodeExecutionSummary[]; summary: { total: number; success: number; failed: number; skipped: number; }; } export interface NodeExecutionSummary { nodeId: string; nodeName: string; nodeType: NodeType; status: "success" | "failed" | "skipped" | "pending"; duration?: number; error?: string; } // ===== 메인 실행 서비스 ===== export class NodeFlowExecutionService { /** * 플로우 실행 메인 함수 */ static async executeFlow( flowId: number, contextData: Record ): Promise { const startTime = Date.now(); try { logger.info(`🚀 플로우 실행 시작: flowId=${flowId}`); // 1. 플로우 데이터 조회 const flow = await queryOne<{ flow_id: number; flow_name: string; flow_data: any; }>( `SELECT flow_id, flow_name, flow_data FROM node_flows WHERE flow_id = $1`, [flowId] ); if (!flow) { throw new Error(`플로우를 찾을 수 없습니다: flowId=${flowId}`); } const flowData = typeof flow.flow_data === "string" ? JSON.parse(flow.flow_data) : flow.flow_data; const { nodes, edges } = flowData; logger.info(`📊 플로우 정보:`, { flowName: flow.flow_name, nodeCount: nodes.length, edgeCount: edges.length, }); // 2. 실행 컨텍스트 준비 const context: ExecutionContext = { sourceData: contextData.sourceData || [], dataSourceType: contextData.dataSourceType || "none", nodeResults: new Map(), executionOrder: [], buttonContext: { buttonId: contextData.buttonId || contextData.context?.buttonId || "unknown", screenId: contextData.screenId || contextData.context?.screenId, companyCode: contextData.companyCode || contextData.context?.companyCode, userId: contextData.userId || contextData.context?.userId, formData: contextData.formData || contextData.context?.formData, selectedRowsData: contextData.selectedRowsData || contextData.context?.selectedRowsData, }, }; logger.info(`📦 실행 컨텍스트:`, { dataSourceType: context.dataSourceType, sourceDataCount: context.sourceData?.length || 0, buttonContext: context.buttonContext, }); // 3. 위상 정렬 const levels = this.topologicalSort(nodes, edges); logger.info(`📋 실행 순서 (레벨별):`, levels); // 4. 레벨별 실행 for (const level of levels) { await this.executeLevel(level, nodes, edges, context); } // 5. 결과 생성 const executionTime = Date.now() - startTime; const result = this.generateExecutionResult( nodes, context, executionTime ); logger.info(`✅ 플로우 실행 완료:`, result.summary); return result; } catch (error) { logger.error(`❌ 플로우 실행 실패:`, error); throw error; } } /** * 소스 데이터 준비 */ private static prepareSourceData(contextData: Record): any[] { const { controlDataSource, formData, selectedRowsData } = contextData; switch (controlDataSource) { case "form": return formData ? [formData] : []; case "table-selection": return selectedRowsData || []; case "both": return [ { source: "form", data: formData }, { source: "table", data: selectedRowsData }, ]; default: return formData ? [formData] : []; } } /** * 위상 정렬 (Topological Sort) * DAG(Directed Acyclic Graph)를 레벨별로 그룹화 */ private static topologicalSort( nodes: FlowNode[], edges: FlowEdge[] ): string[][] { const levels: string[][] = []; const inDegree = new Map(); const adjacency = new Map(); // 초기화 nodes.forEach((node) => { inDegree.set(node.id, 0); adjacency.set(node.id, []); }); // 진입 차수 계산 edges.forEach((edge) => { inDegree.set(edge.target, (inDegree.get(edge.target) || 0) + 1); adjacency.get(edge.source)?.push(edge.target); }); // 레벨별 분류 let currentLevel = nodes .filter((node) => inDegree.get(node.id) === 0) .map((node) => node.id); while (currentLevel.length > 0) { levels.push([...currentLevel]); const nextLevel: string[] = []; currentLevel.forEach((nodeId) => { const neighbors = adjacency.get(nodeId) || []; neighbors.forEach((neighbor) => { const newDegree = (inDegree.get(neighbor) || 1) - 1; inDegree.set(neighbor, newDegree); if (newDegree === 0) { nextLevel.push(neighbor); } }); }); currentLevel = nextLevel; } return levels; } /** * 레벨 내 노드 병렬 실행 */ private static async executeLevel( nodeIds: string[], nodes: FlowNode[], edges: FlowEdge[], context: ExecutionContext ): Promise { logger.info(`⏳ 레벨 실행 시작: ${nodeIds.length}개 노드`); // Promise.allSettled로 병렬 실행 const results = await Promise.allSettled( nodeIds.map((nodeId) => this.executeNode(nodeId, nodes, edges, context)) ); // 결과 저장 results.forEach((result, index) => { const nodeId = nodeIds[index]; if (result.status === "fulfilled") { context.nodeResults.set(nodeId, result.value); context.executionOrder.push(nodeId); } else { context.nodeResults.set(nodeId, { nodeId, status: "failed", error: result.reason, startTime: Date.now(), endTime: Date.now(), }); } }); logger.info(`✅ 레벨 실행 완료`); } /** * 개별 노드 실행 */ private static async executeNode( nodeId: string, nodes: FlowNode[], edges: FlowEdge[], context: ExecutionContext ): Promise { const startTime = Date.now(); const node = nodes.find((n) => n.id === nodeId); if (!node) { throw new Error(`노드를 찾을 수 없습니다: ${nodeId}`); } logger.info(`🔄 노드 실행 시작: ${nodeId} (${node.type})`); // 1. 부모 노드 상태 확인 (연쇄 중단) const parents = this.getParentNodes(nodeId, edges); const parentFailed = parents.some((parentId) => { const parentResult = context.nodeResults.get(parentId); return parentResult?.status === "failed"; }); if (parentFailed) { logger.warn(`⏭️ 노드 스킵 (부모 실패): ${nodeId}`); return { nodeId, status: "skipped", error: new Error("Parent node failed"), startTime, endTime: Date.now(), }; } // 2. 입력 데이터 준비 const inputData = this.prepareInputData(nodeId, parents, edges, context); // 3. 노드 타입별 실행 try { const result = await this.executeNodeByType(node, inputData, context); logger.info(`✅ 노드 실행 성공: ${nodeId}`); return { nodeId, status: "success", data: result, startTime, endTime: Date.now(), }; } catch (error) { logger.error(`❌ 노드 실행 실패: ${nodeId}`, error); return { nodeId, status: "failed", error: error as Error, startTime, endTime: Date.now(), }; } } /** * 부모 노드 목록 조회 */ private static getParentNodes(nodeId: string, edges: FlowEdge[]): string[] { return edges .filter((edge) => edge.target === nodeId) .map((edge) => edge.source); } /** * 입력 데이터 준비 */ private static prepareInputData( nodeId: string, parents: string[], edges: FlowEdge[], context: ExecutionContext ): any { if (parents.length === 0) { // 소스 노드: 원본 데이터 사용 return context.sourceData; } else if (parents.length === 1) { // 단일 부모: 부모의 결과 데이터 전달 const parentResult = context.nodeResults.get(parents[0]); return parentResult?.data || context.sourceData; } else { // 다중 부모: 모든 부모의 데이터 병합 return parents.map((parentId) => { const result = context.nodeResults.get(parentId); return result?.data || context.sourceData; }); } } /** * 노드 타입별 실행 로직 */ private static async executeNodeByType( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { switch (node.type) { case "tableSource": return this.executeTableSource(node, context); case "dataTransform": return this.executeDataTransform(node, inputData, context); case "insertAction": return this.executeInsertAction(node, inputData, context); case "updateAction": return this.executeUpdateAction(node, inputData, context); case "deleteAction": return this.executeDeleteAction(node, inputData, context); case "upsertAction": return this.executeUpsertAction(node, inputData, context); case "condition": return this.executeCondition(node, inputData, context); case "comment": case "log": // 로그/코멘트는 실행 없이 통과 logger.info(`📝 ${node.type}: ${node.data.displayName || node.id}`); return { message: "Logged" }; default: logger.warn(`⚠️ 지원하지 않는 노드 타입: ${node.type}`); return { message: "Unsupported node type" }; } } /** * 테이블 소스 노드 실행 */ private static async executeTableSource( node: FlowNode, context: ExecutionContext ): Promise { // 🔥 외부에서 주입된 데이터가 있으면 우선 사용 if ( context.sourceData && Array.isArray(context.sourceData) && context.sourceData.length > 0 ) { logger.info( `📊 외부 주입 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}건` ); return context.sourceData; } // 외부 데이터가 없으면 DB 쿼리 실행 const { tableName, schema, whereConditions } = node.data; if (!tableName) { logger.warn( "⚠️ 테이블 소스 노드에 테이블명이 없고, 외부 데이터도 없습니다." ); return []; } const schemaPrefix = schema ? `${schema}.` : ""; const whereClause = whereConditions ? `WHERE ${this.buildWhereClause(whereConditions)}` : ""; const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereClause}`; const result = await query(sql, []); logger.info(`📊 테이블 소스 조회: ${tableName}, ${result.length}건`); return result; } /** * INSERT 액션 노드 실행 */ private static async executeInsertAction( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { targetType } = node.data; // 🔥 타겟 타입별 분기 switch (targetType) { case "internal": return this.executeInternalInsert(node, inputData, context); case "external": return this.executeExternalInsert(node, inputData, context); case "api": return this.executeApiInsert(node, inputData, context); default: // 하위 호환성: targetType이 없으면 internal로 간주 logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`); return this.executeInternalInsert(node, inputData, context); } } /** * 내부 DB INSERT 실행 */ private static async executeInternalInsert( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { targetTable, fieldMappings } = node.data; return transaction(async (client) => { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let insertedCount = 0; for (const data of dataArray) { const fields: string[] = []; const values: any[] = []; fieldMappings.forEach((mapping: any) => { fields.push(mapping.targetField); const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; values.push(value); }); const sql = ` INSERT INTO ${targetTable} (${fields.join(", ")}) VALUES (${fields.map((_, i) => `$${i + 1}`).join(", ")}) `; await client.query(sql, values); insertedCount++; } logger.info( `✅ INSERT 완료 (내부 DB): ${targetTable}, ${insertedCount}건` ); return { insertedCount }; }); } /** * 외부 DB INSERT 실행 */ private static async executeExternalInsert( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { externalConnectionId, externalDbType, externalTargetTable, fieldMappings, } = node.data; if (!externalConnectionId || !externalTargetTable) { throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다."); } logger.info( `🔌 외부 DB INSERT 시작: ${externalDbType} - ${externalTargetTable}` ); // 외부 DB 커넥터 생성 const connector = await this.createExternalConnector( externalConnectionId, externalDbType ); try { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let insertedCount = 0; // 🔥 Oracle의 경우 autoCommit을 false로 설정하여 트랜잭션 제어 const isOracle = externalDbType.toLowerCase() === "oracle"; for (const data of dataArray) { const fields: string[] = []; const values: any[] = []; fieldMappings.forEach((mapping: any) => { fields.push(mapping.targetField); const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; values.push(value); }); // 외부 DB별 SQL 문법 차이 처리 let sql: string; let params: any[]; if (isOracle) { // Oracle: :1, :2, ... 형식 const placeholders = fields.map((_, i) => `:${i + 1}`).join(", "); sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`; params = values; } else if ( ["mysql", "mariadb"].includes(externalDbType.toLowerCase()) ) { // MySQL/MariaDB: ? 형식 const placeholders = fields.map(() => "?").join(", "); sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`; params = values; } else if (externalDbType.toLowerCase() === "mssql") { // MSSQL: @p1, @p2, ... 형식 const placeholders = fields.map((_, i) => `@p${i + 1}`).join(", "); sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`; params = values; } else { // PostgreSQL: $1, $2, ... 형식 (기본) const placeholders = fields.map((_, i) => `$${i + 1}`).join(", "); sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`; params = values; } await connector.executeQuery(sql, params); insertedCount++; } // 🔥 Oracle의 경우 명시적 COMMIT await this.commitExternalTransaction( connector, externalDbType, insertedCount ); logger.info( `✅ INSERT 완료 (외부 DB): ${externalTargetTable}, ${insertedCount}건` ); return { insertedCount }; } catch (error) { // 🔥 Oracle의 경우 오류 시 ROLLBACK await this.rollbackExternalTransaction(connector, externalDbType); throw error; } finally { // 연결 해제 await connector.disconnect(); } } /** * REST API INSERT 실행 (POST 요청) */ private static async executeApiInsert( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { apiEndpoint, apiMethod, apiAuthType, apiAuthConfig, apiHeaders, apiBodyTemplate, fieldMappings, } = node.data; if (!apiEndpoint) { throw new Error("API 엔드포인트가 설정되지 않았습니다."); } logger.info(`🌐 REST API INSERT 시작: ${apiMethod} ${apiEndpoint}`); const axios = require("axios"); const dataArray = Array.isArray(inputData) ? inputData : [inputData]; const results: any[] = []; for (const data of dataArray) { // 헤더 설정 const headers: any = { ...apiHeaders }; // 인증 헤더 추가 if (apiAuthType === "bearer" && apiAuthConfig?.token) { headers["Authorization"] = `Bearer ${apiAuthConfig.token}`; } else if ( apiAuthType === "basic" && apiAuthConfig?.username && apiAuthConfig?.password ) { const credentials = Buffer.from( `${apiAuthConfig.username}:${apiAuthConfig.password}` ).toString("base64"); headers["Authorization"] = `Basic ${credentials}`; } else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) { const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key"; headers[headerName] = apiAuthConfig.apiKey; } // Content-Type 기본값 설정 if (!headers["Content-Type"]) { headers["Content-Type"] = "application/json"; } // 바디 생성 (템플릿 또는 필드 매핑) let body: any; if (apiBodyTemplate) { // 템플릿 변수 치환 body = this.replaceTemplateVariables(apiBodyTemplate, data); } else if (fieldMappings && fieldMappings.length > 0) { // 필드 매핑 사용 body = {}; fieldMappings.forEach((mapping: any) => { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; body[mapping.targetField] = value; }); } else { // 전체 데이터 전송 body = data; } try { const response = await axios({ method: apiMethod || "POST", url: apiEndpoint, headers, data: body, timeout: 30000, // 30초 타임아웃 }); results.push({ status: response.status, data: response.data, }); } catch (error: any) { logger.error( `❌ API 요청 실패: ${error.response?.status || error.message}` ); throw error; } } logger.info(`✅ REST API INSERT 완료: ${results.length}건`); return { results }; } /** * 템플릿 변수 치환 ({{variable}} 형식) */ private static replaceTemplateVariables(template: string, data: any): string { let result = template; // {{variable}} 형식의 변수를 찾아서 치환 const matches = template.match(/\{\{([^}]+)\}\}/g); if (matches) { matches.forEach((match) => { const key = match.replace(/\{\{|\}\}/g, "").trim(); const value = this.getNestedValue(data, key); result = result.replace(match, value !== undefined ? value : ""); }); } return result; } /** * 중첩된 객체 값 가져오기 (예: "user.name") */ private static getNestedValue(obj: any, path: string): any { return path.split(".").reduce((current, key) => current?.[key], obj); } /** * 외부 DB 커넥터 생성 (공통 로직) */ private static async createExternalConnector( connectionId: number, dbType: string ): Promise { // 외부 DB 커넥션 정보 조회 const connectionData: any = await queryOne( "SELECT * FROM external_db_connections WHERE id = $1", [connectionId] ); if (!connectionData) { throw new Error(`외부 DB 커넥션을 찾을 수 없습니다: ${connectionId}`); } // 패스워드 복호화 const { EncryptUtil } = await import("../utils/encryptUtil"); const decryptedPassword = EncryptUtil.decrypt(connectionData.password); const config = { host: connectionData.host, port: connectionData.port, database: connectionData.database_name, user: connectionData.username, password: decryptedPassword, }; // DatabaseConnectorFactory를 사용하여 외부 DB 연결 const { DatabaseConnectorFactory } = await import( "../database/DatabaseConnectorFactory" ); return await DatabaseConnectorFactory.createConnector( dbType, config, connectionId ); } /** * 외부 DB 트랜잭션 커밋 (Oracle 전용) */ private static async commitExternalTransaction( connector: any, dbType: string, count: number ): Promise { if (dbType.toLowerCase() === "oracle" && count > 0) { await connector.executeQuery("COMMIT"); logger.info(`✅ Oracle COMMIT 실행: ${count}건`); } } /** * 외부 DB 트랜잭션 롤백 (Oracle 전용) */ private static async rollbackExternalTransaction( connector: any, dbType: string ): Promise { if (dbType.toLowerCase() === "oracle") { try { await connector.executeQuery("ROLLBACK"); logger.info(`⚠️ Oracle ROLLBACK 실행 (오류 발생)`); } catch (rollbackError) { logger.error(`❌ Oracle ROLLBACK 실패:`, rollbackError); } } } /** * UPDATE 액션 노드 실행 */ private static async executeUpdateAction( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { targetType } = node.data; // 🔥 타겟 타입별 분기 switch (targetType) { case "internal": return this.executeInternalUpdate(node, inputData, context); case "external": return this.executeExternalUpdate(node, inputData, context); case "api": return this.executeApiUpdate(node, inputData, context); default: // 하위 호환성: targetType이 없으면 internal로 간주 logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`); return this.executeInternalUpdate(node, inputData, context); } } /** * 내부 DB UPDATE 실행 */ private static async executeInternalUpdate( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { targetTable, fieldMappings, whereConditions } = node.data; return transaction(async (client) => { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let updatedCount = 0; for (const data of dataArray) { const setClauses: string[] = []; const values: any[] = []; let paramIndex = 1; fieldMappings.forEach((mapping: any) => { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; setClauses.push(`${mapping.targetField} = $${paramIndex}`); values.push(value); paramIndex++; }); const whereClause = this.buildWhereClause( whereConditions, data, paramIndex ); const sql = ` UPDATE ${targetTable} SET ${setClauses.join(", ")} ${whereClause} `; const result = await client.query(sql, values); updatedCount += result.rowCount || 0; } logger.info( `✅ UPDATE 완료 (내부 DB): ${targetTable}, ${updatedCount}건` ); return { updatedCount }; }); } /** * 외부 DB UPDATE 실행 */ private static async executeExternalUpdate( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { externalConnectionId, externalDbType, externalTargetTable, fieldMappings, whereConditions, } = node.data; if (!externalConnectionId || !externalTargetTable) { throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다."); } logger.info( `🔌 외부 DB UPDATE 시작: ${externalDbType} - ${externalTargetTable}` ); // 외부 DB 커넥터 생성 const connector = await this.createExternalConnector( externalConnectionId, externalDbType ); try { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let updatedCount = 0; for (const data of dataArray) { const setClauses: string[] = []; const values: any[] = []; let paramIndex = 1; fieldMappings.forEach((mapping: any) => { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; // DB별 플레이스홀더 if (externalDbType.toLowerCase() === "oracle") { setClauses.push(`${mapping.targetField} = :${paramIndex}`); } else if ( ["mysql", "mariadb"].includes(externalDbType.toLowerCase()) ) { setClauses.push(`${mapping.targetField} = ?`); } else if (externalDbType.toLowerCase() === "mssql") { setClauses.push(`${mapping.targetField} = @p${paramIndex}`); } else { setClauses.push(`${mapping.targetField} = $${paramIndex}`); } values.push(value); paramIndex++; }); // WHERE 조건 생성 const whereClauses: string[] = []; whereConditions?.forEach((condition: any) => { const condValue = data[condition.field]; if (condition.operator === "IS NULL") { whereClauses.push(`${condition.field} IS NULL`); } else if (condition.operator === "IS NOT NULL") { whereClauses.push(`${condition.field} IS NOT NULL`); } else { if (externalDbType.toLowerCase() === "oracle") { whereClauses.push( `${condition.field} ${condition.operator} :${paramIndex}` ); } else if ( ["mysql", "mariadb"].includes(externalDbType.toLowerCase()) ) { whereClauses.push(`${condition.field} ${condition.operator} ?`); } else if (externalDbType.toLowerCase() === "mssql") { whereClauses.push( `${condition.field} ${condition.operator} @p${paramIndex}` ); } else { whereClauses.push( `${condition.field} ${condition.operator} $${paramIndex}` ); } values.push(condValue); paramIndex++; } }); const whereClause = whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : ""; const sql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} ${whereClause}`; const result = await connector.executeQuery(sql, values); updatedCount += result.rowCount || result.affectedRows || 0; } // 🔥 Oracle의 경우 명시적 COMMIT await this.commitExternalTransaction( connector, externalDbType, updatedCount ); logger.info( `✅ UPDATE 완료 (외부 DB): ${externalTargetTable}, ${updatedCount}건` ); return { updatedCount }; } catch (error) { // 🔥 Oracle의 경우 오류 시 ROLLBACK await this.rollbackExternalTransaction(connector, externalDbType); throw error; } finally { await connector.disconnect(); } } /** * REST API UPDATE 실행 (PUT/PATCH 요청) */ private static async executeApiUpdate( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { apiEndpoint, apiMethod, apiAuthType, apiAuthConfig, apiHeaders, apiBodyTemplate, fieldMappings, } = node.data; if (!apiEndpoint) { throw new Error("API 엔드포인트가 설정되지 않았습니다."); } logger.info(`🌐 REST API UPDATE 시작: ${apiMethod} ${apiEndpoint}`); const axios = require("axios"); const dataArray = Array.isArray(inputData) ? inputData : [inputData]; const results: any[] = []; for (const data of dataArray) { // 헤더 설정 const headers: any = { ...apiHeaders }; // 인증 헤더 추가 if (apiAuthType === "bearer" && apiAuthConfig?.token) { headers["Authorization"] = `Bearer ${apiAuthConfig.token}`; } else if ( apiAuthType === "basic" && apiAuthConfig?.username && apiAuthConfig?.password ) { const credentials = Buffer.from( `${apiAuthConfig.username}:${apiAuthConfig.password}` ).toString("base64"); headers["Authorization"] = `Basic ${credentials}`; } else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) { const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key"; headers[headerName] = apiAuthConfig.apiKey; } if (!headers["Content-Type"]) { headers["Content-Type"] = "application/json"; } // 바디 생성 let body: any; if (apiBodyTemplate) { body = this.replaceTemplateVariables(apiBodyTemplate, data); } else if (fieldMappings && fieldMappings.length > 0) { body = {}; fieldMappings.forEach((mapping: any) => { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; body[mapping.targetField] = value; }); } else { body = data; } try { const response = await axios({ method: apiMethod || "PUT", url: apiEndpoint, headers, data: body, timeout: 30000, }); results.push({ status: response.status, data: response.data, }); } catch (error: any) { logger.error( `❌ API 요청 실패: ${error.response?.status || error.message}` ); throw error; } } logger.info(`✅ REST API UPDATE 완료: ${results.length}건`); return { results }; } /** * DELETE 액션 노드 실행 */ private static async executeDeleteAction( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { targetType } = node.data; // 🔥 타겟 타입별 분기 switch (targetType) { case "internal": return this.executeInternalDelete(node, inputData, context); case "external": return this.executeExternalDelete(node, inputData, context); case "api": return this.executeApiDelete(node, inputData, context); default: // 하위 호환성: targetType이 없으면 internal로 간주 logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`); return this.executeInternalDelete(node, inputData, context); } } /** * 내부 DB DELETE 실행 */ private static async executeInternalDelete( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { targetTable, whereConditions } = node.data; return transaction(async (client) => { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let deletedCount = 0; for (const data of dataArray) { const whereClause = this.buildWhereClause(whereConditions, data, 1); const sql = `DELETE FROM ${targetTable} ${whereClause}`; const result = await client.query(sql, []); deletedCount += result.rowCount || 0; } logger.info( `✅ DELETE 완료 (내부 DB): ${targetTable}, ${deletedCount}건` ); return { deletedCount }; }); } /** * 외부 DB DELETE 실행 */ private static async executeExternalDelete( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { externalConnectionId, externalDbType, externalTargetTable, whereConditions, } = node.data; if (!externalConnectionId || !externalTargetTable) { throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다."); } logger.info( `🔌 외부 DB DELETE 시작: ${externalDbType} - ${externalTargetTable}` ); // 외부 DB 커넥터 생성 const connector = await this.createExternalConnector( externalConnectionId, externalDbType ); try { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let deletedCount = 0; for (const data of dataArray) { const whereClauses: string[] = []; const values: any[] = []; let paramIndex = 1; // WHERE 조건 생성 whereConditions?.forEach((condition: any) => { const condValue = data[condition.field]; if (condition.operator === "IS NULL") { whereClauses.push(`${condition.field} IS NULL`); } else if (condition.operator === "IS NOT NULL") { whereClauses.push(`${condition.field} IS NOT NULL`); } else { if (externalDbType.toLowerCase() === "oracle") { whereClauses.push( `${condition.field} ${condition.operator} :${paramIndex}` ); } else if ( ["mysql", "mariadb"].includes(externalDbType.toLowerCase()) ) { whereClauses.push(`${condition.field} ${condition.operator} ?`); } else if (externalDbType.toLowerCase() === "mssql") { whereClauses.push( `${condition.field} ${condition.operator} @p${paramIndex}` ); } else { whereClauses.push( `${condition.field} ${condition.operator} $${paramIndex}` ); } values.push(condValue); paramIndex++; } }); const whereClause = whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : ""; if (!whereClause) { throw new Error( "DELETE 작업에 WHERE 조건이 필요합니다. (전체 삭제 방지)" ); } const sql = `DELETE FROM ${externalTargetTable} ${whereClause}`; const result = await connector.executeQuery(sql, values); deletedCount += result.rowCount || result.affectedRows || 0; } // 🔥 Oracle의 경우 명시적 COMMIT await this.commitExternalTransaction( connector, externalDbType, deletedCount ); logger.info( `✅ DELETE 완료 (외부 DB): ${externalTargetTable}, ${deletedCount}건` ); return { deletedCount }; } catch (error) { // 🔥 Oracle의 경우 오류 시 ROLLBACK await this.rollbackExternalTransaction(connector, externalDbType); throw error; } finally { await connector.disconnect(); } } /** * REST API DELETE 실행 (DELETE 요청) */ private static async executeApiDelete( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { apiEndpoint, apiAuthType, apiAuthConfig, apiHeaders } = node.data; if (!apiEndpoint) { throw new Error("API 엔드포인트가 설정되지 않았습니다."); } logger.info(`🌐 REST API DELETE 시작: ${apiEndpoint}`); const axios = require("axios"); const dataArray = Array.isArray(inputData) ? inputData : [inputData]; const results: any[] = []; for (const data of dataArray) { // 헤더 설정 const headers: any = { ...apiHeaders }; // 인증 헤더 추가 if (apiAuthType === "bearer" && apiAuthConfig?.token) { headers["Authorization"] = `Bearer ${apiAuthConfig.token}`; } else if ( apiAuthType === "basic" && apiAuthConfig?.username && apiAuthConfig?.password ) { const credentials = Buffer.from( `${apiAuthConfig.username}:${apiAuthConfig.password}` ).toString("base64"); headers["Authorization"] = `Basic ${credentials}`; } else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) { const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key"; headers[headerName] = apiAuthConfig.apiKey; } // DELETE는 일반적으로 URL 파라미터 또는 경로에 ID 포함 // 템플릿 변수 치환 지원 (예: /api/users/{{id}}) const url = this.replaceTemplateVariables(apiEndpoint, data); try { const response = await axios({ method: "DELETE", url, headers, timeout: 30000, }); results.push({ status: response.status, data: response.data, }); } catch (error: any) { logger.error( `❌ API 요청 실패: ${error.response?.status || error.message}` ); throw error; } } logger.info(`✅ REST API DELETE 완료: ${results.length}건`); return { results }; } /** * UPSERT 액션 노드 실행 */ private static async executeUpsertAction( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { targetType } = node.data; // 🔥 타겟 타입별 분기 switch (targetType) { case "internal": return this.executeInternalUpsert(node, inputData, context); case "external": return this.executeExternalUpsert(node, inputData, context); case "api": return this.executeApiUpsert(node, inputData, context); default: // 하위 호환성: targetType이 없으면 internal로 간주 logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`); return this.executeInternalUpsert(node, inputData, context); } } /** * 내부 DB UPSERT 실행 (로직 기반) * DB 제약 조건 없이 SELECT → UPDATE or INSERT 방식으로 구현 */ private static async executeInternalUpsert( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { targetTable, fieldMappings, conflictKeys } = node.data; if (!targetTable || !fieldMappings || fieldMappings.length === 0) { throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다."); } if (!conflictKeys || conflictKeys.length === 0) { throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다."); } return transaction(async (client) => { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let insertedCount = 0; let updatedCount = 0; for (const data of dataArray) { // 1. 충돌 키 값 추출 const conflictKeyValues: Record = {}; conflictKeys.forEach((key: string) => { const mapping = fieldMappings.find((m: any) => m.targetField === key); if (mapping) { conflictKeyValues[key] = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; } }); // 2. 존재 여부 확인 (SELECT) const whereConditions = conflictKeys .map((key: string, index: number) => `${key} = $${index + 1}`) .join(" AND "); const whereValues = conflictKeys.map( (key: string) => conflictKeyValues[key] ); const checkSql = `SELECT id FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`; const existingRow = await client.query(checkSql, whereValues); if (existingRow.rows.length > 0) { // 3-A. 존재하면 UPDATE const setClauses: string[] = []; const updateValues: any[] = []; let paramIndex = 1; fieldMappings.forEach((mapping: any) => { // 충돌 키가 아닌 필드만 UPDATE if (!conflictKeys.includes(mapping.targetField)) { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; setClauses.push(`${mapping.targetField} = $${paramIndex}`); updateValues.push(value); paramIndex++; } }); // WHERE 조건 생성 (파라미터 인덱스 이어서) const updateWhereConditions = conflictKeys .map( (key: string, index: number) => `${key} = $${paramIndex + index}` ) .join(" AND "); // WHERE 조건 값 추가 whereValues.forEach((val: any) => { updateValues.push(val); }); const updateSql = ` UPDATE ${targetTable} SET ${setClauses.join(", ")} WHERE ${updateWhereConditions} `; logger.info(`🔄 UPDATE 실행:`, { table: targetTable, conflictKeys, conflictKeyValues, sql: updateSql, values: updateValues, }); await client.query(updateSql, updateValues); updatedCount++; } else { // 3-B. 없으면 INSERT const columns: string[] = []; const values: any[] = []; fieldMappings.forEach((mapping: any) => { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; columns.push(mapping.targetField); values.push(value); }); const placeholders = values.map((_, i) => `$${i + 1}`).join(", "); const insertSql = ` INSERT INTO ${targetTable} (${columns.join(", ")}) VALUES (${placeholders}) `; logger.info(`➕ INSERT 실행:`, { table: targetTable, conflictKeys, conflictKeyValues, }); await client.query(insertSql, values); insertedCount++; } } logger.info( `✅ UPSERT 완료 (내부 DB): ${targetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}건` ); return { insertedCount, updatedCount, totalCount: insertedCount + updatedCount, }; }); } /** * 외부 DB UPSERT 실행 (로직 기반) */ private static async executeExternalUpsert( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { externalConnectionId, externalDbType, externalTargetTable, fieldMappings, conflictKeys, } = node.data; if (!externalConnectionId || !externalTargetTable) { throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다."); } if (!fieldMappings || fieldMappings.length === 0) { throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다."); } if (!conflictKeys || conflictKeys.length === 0) { throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다."); } logger.info( `🔌 외부 DB UPSERT 시작: ${externalDbType} - ${externalTargetTable}` ); // 외부 DB 커넥터 생성 const connector = await this.createExternalConnector( externalConnectionId, externalDbType ); try { const dataArray = Array.isArray(inputData) ? inputData : [inputData]; let insertedCount = 0; let updatedCount = 0; for (const data of dataArray) { // 1. 충돌 키 값 추출 const conflictKeyValues: Record = {}; conflictKeys.forEach((key: string) => { const mapping = fieldMappings.find((m: any) => m.targetField === key); if (mapping) { conflictKeyValues[key] = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; } }); // 2. 존재 여부 확인 (SELECT) const whereClauses: string[] = []; const whereValues: any[] = []; let paramIndex = 1; conflictKeys.forEach((key: string) => { if (externalDbType.toLowerCase() === "oracle") { whereClauses.push(`${key} = :${paramIndex}`); } else if ( ["mysql", "mariadb"].includes(externalDbType.toLowerCase()) ) { whereClauses.push(`${key} = ?`); } else if (externalDbType.toLowerCase() === "mssql") { whereClauses.push(`${key} = @p${paramIndex}`); } else { whereClauses.push(`${key} = $${paramIndex}`); } whereValues.push(conflictKeyValues[key]); paramIndex++; }); const checkSql = `SELECT * FROM ${externalTargetTable} WHERE ${whereClauses.join(" AND ")} LIMIT 1`; const existingRow = await connector.executeQuery(checkSql, whereValues); const hasExistingRow = existingRow.rows?.length > 0 || existingRow.length > 0; if (hasExistingRow) { // 3-A. 존재하면 UPDATE const setClauses: string[] = []; const updateValues: any[] = []; paramIndex = 1; fieldMappings.forEach((mapping: any) => { if (!conflictKeys.includes(mapping.targetField)) { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; if (externalDbType.toLowerCase() === "oracle") { setClauses.push(`${mapping.targetField} = :${paramIndex}`); } else if ( ["mysql", "mariadb"].includes(externalDbType.toLowerCase()) ) { setClauses.push(`${mapping.targetField} = ?`); } else if (externalDbType.toLowerCase() === "mssql") { setClauses.push(`${mapping.targetField} = @p${paramIndex}`); } else { setClauses.push(`${mapping.targetField} = $${paramIndex}`); } updateValues.push(value); paramIndex++; } }); // WHERE 조건 생성 const updateWhereClauses: string[] = []; conflictKeys.forEach((key: string) => { if (externalDbType.toLowerCase() === "oracle") { updateWhereClauses.push(`${key} = :${paramIndex}`); } else if ( ["mysql", "mariadb"].includes(externalDbType.toLowerCase()) ) { updateWhereClauses.push(`${key} = ?`); } else if (externalDbType.toLowerCase() === "mssql") { updateWhereClauses.push(`${key} = @p${paramIndex}`); } else { updateWhereClauses.push(`${key} = $${paramIndex}`); } updateValues.push(conflictKeyValues[key]); paramIndex++; }); const updateSql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} WHERE ${updateWhereClauses.join(" AND ")}`; await connector.executeQuery(updateSql, updateValues); updatedCount++; } else { // 3-B. 없으면 INSERT const columns: string[] = []; const values: any[] = []; fieldMappings.forEach((mapping: any) => { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; columns.push(mapping.targetField); values.push(value); }); let insertSql: string; if (externalDbType.toLowerCase() === "oracle") { const placeholders = columns.map((_, i) => `:${i + 1}`).join(", "); insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`; } else if ( ["mysql", "mariadb"].includes(externalDbType.toLowerCase()) ) { const placeholders = columns.map(() => "?").join(", "); insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`; } else if (externalDbType.toLowerCase() === "mssql") { const placeholders = columns.map((_, i) => `@p${i + 1}`).join(", "); insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`; } else { const placeholders = columns.map((_, i) => `$${i + 1}`).join(", "); insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`; } await connector.executeQuery(insertSql, values); insertedCount++; } } // 🔥 Oracle의 경우 명시적 COMMIT await this.commitExternalTransaction( connector, externalDbType, insertedCount + updatedCount ); logger.info( `✅ UPSERT 완료 (외부 DB): ${externalTargetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}건` ); return { insertedCount, updatedCount, totalCount: insertedCount + updatedCount, }; } catch (error) { // 🔥 Oracle의 경우 오류 시 ROLLBACK await this.rollbackExternalTransaction(connector, externalDbType); throw error; } finally { await connector.disconnect(); } } /** * REST API UPSERT 실행 (POST/PUT 요청) * API 응답에 따라 INSERT/UPDATE 판단 */ private static async executeApiUpsert( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { apiEndpoint, apiMethod, apiAuthType, apiAuthConfig, apiHeaders, apiBodyTemplate, fieldMappings, } = node.data; if (!apiEndpoint) { throw new Error("API 엔드포인트가 설정되지 않았습니다."); } logger.info(`🌐 REST API UPSERT 시작: ${apiMethod} ${apiEndpoint}`); const axios = require("axios"); const dataArray = Array.isArray(inputData) ? inputData : [inputData]; const results: any[] = []; for (const data of dataArray) { // 헤더 설정 const headers: any = { ...apiHeaders }; // 인증 헤더 추가 if (apiAuthType === "bearer" && apiAuthConfig?.token) { headers["Authorization"] = `Bearer ${apiAuthConfig.token}`; } else if ( apiAuthType === "basic" && apiAuthConfig?.username && apiAuthConfig?.password ) { const credentials = Buffer.from( `${apiAuthConfig.username}:${apiAuthConfig.password}` ).toString("base64"); headers["Authorization"] = `Basic ${credentials}`; } else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) { const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key"; headers[headerName] = apiAuthConfig.apiKey; } if (!headers["Content-Type"]) { headers["Content-Type"] = "application/json"; } // 바디 생성 let body: any; if (apiBodyTemplate) { body = this.replaceTemplateVariables(apiBodyTemplate, data); } else if (fieldMappings && fieldMappings.length > 0) { body = {}; fieldMappings.forEach((mapping: any) => { const value = mapping.staticValue !== undefined ? mapping.staticValue : data[mapping.sourceField]; body[mapping.targetField] = value; }); } else { body = data; } try { // UPSERT는 일반적으로 PUT 메서드 사용 (멱등성) const response = await axios({ method: apiMethod || "PUT", url: apiEndpoint, headers, data: body, timeout: 30000, }); results.push({ status: response.status, data: response.data, }); } catch (error: any) { logger.error( `❌ API 요청 실패: ${error.response?.status || error.message}` ); throw error; } } logger.info(`✅ REST API UPSERT 완료: ${results.length}건`); return { results }; } /** * 조건 노드 실행 */ private static async executeCondition( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { conditions, logic } = node.data; const results = conditions.map((condition: any) => { const fieldValue = inputData[condition.field]; return this.evaluateCondition( fieldValue, condition.operator, condition.value ); }); const result = logic === "OR" ? results.some((r: boolean) => r) : results.every((r: boolean) => r); logger.info(`🔍 조건 평가 결과: ${result}`); return result; } /** * WHERE 절 생성 */ private static buildWhereClause( conditions: any[], data?: any, startIndex: number = 1 ): string { if (!conditions || conditions.length === 0) { return ""; } const clauses = conditions.map((condition, index) => { const value = data ? data[condition.field] : condition.value; return `${condition.field} ${condition.operator} $${startIndex + index}`; }); return `WHERE ${clauses.join(" AND ")}`; } /** * 조건 평가 */ private static evaluateCondition( fieldValue: any, operator: string, expectedValue: any ): boolean { switch (operator) { case "equals": case "=": return fieldValue === expectedValue; case "notEquals": case "!=": return fieldValue !== expectedValue; case "greaterThan": case ">": return fieldValue > expectedValue; case "lessThan": case "<": return fieldValue < expectedValue; case "contains": return String(fieldValue).includes(String(expectedValue)); default: return false; } } /** * 실행 결과 생성 */ private static generateExecutionResult( nodes: FlowNode[], context: ExecutionContext, executionTime: number ): ExecutionResult { const nodeSummaries: NodeExecutionSummary[] = nodes.map((node) => { const result = context.nodeResults.get(node.id); return { nodeId: node.id, nodeName: node.data.displayName || node.id, nodeType: node.type, status: result?.status || "pending", duration: result?.endTime ? result.endTime - result.startTime : undefined, error: result?.error?.message, }; }); const summary = { total: nodes.length, success: nodeSummaries.filter((n) => n.status === "success").length, failed: nodeSummaries.filter((n) => n.status === "failed").length, skipped: nodeSummaries.filter((n) => n.status === "skipped").length, }; const success = summary.failed === 0; return { success, message: success ? `플로우 실행 성공 (${summary.success}/${summary.total})` : `플로우 실행 부분 실패 (성공: ${summary.success}, 실패: ${summary.failed})`, executionTime, nodes: nodeSummaries, summary, }; } /** * 데이터 변환 노드 실행 */ private static async executeDataTransform( node: FlowNode, inputData: any, context: ExecutionContext ): Promise { const { transformations } = node.data; if ( !transformations || !Array.isArray(transformations) || transformations.length === 0 ) { logger.warn(`⚠️ 데이터 변환 노드에 변환 규칙이 없습니다: ${node.id}`); return Array.isArray(inputData) ? inputData : [inputData]; } // inputData를 배열로 정규화 const rows = Array.isArray(inputData) ? inputData : [inputData]; logger.info( `🔄 데이터 변환 시작: ${rows.length}개 행, ${transformations.length}개 변환` ); // 각 변환 규칙을 순차적으로 적용 let transformedRows = rows; for (const transform of transformations) { const transformType = transform.type; logger.info(` 🔹 변환 적용: ${transformType}`); transformedRows = this.applyTransformation(transformedRows, transform); } logger.info(`✅ 데이터 변환 완료: ${transformedRows.length}개 행`); return transformedRows; } /** * 단일 변환 규칙 적용 */ private static applyTransformation(rows: any[], transform: any): any[] { const { type, sourceField, targetField, delimiter, separator, searchValue, replaceValue, splitIndex, castType, expression, } = transform; // 타겟 필드 결정 (비어있으면 소스 필드 사용 = in-place) const actualTargetField = targetField || sourceField; switch (type) { case "UPPERCASE": return rows.map((row) => ({ ...row, [actualTargetField]: row[sourceField]?.toString().toUpperCase() || row[sourceField], })); case "LOWERCASE": return rows.map((row) => ({ ...row, [actualTargetField]: row[sourceField]?.toString().toLowerCase() || row[sourceField], })); case "TRIM": return rows.map((row) => ({ ...row, [actualTargetField]: row[sourceField]?.toString().trim() || row[sourceField], })); case "EXPLODE": return this.applyExplode( rows, sourceField, actualTargetField, delimiter || "," ); case "CONCAT": return rows.map((row) => { const value1 = row[sourceField] || ""; // CONCAT은 여러 필드를 합칠 수 있지만, 단순화하여 expression 사용 const value2 = expression || ""; return { ...row, [actualTargetField]: `${value1}${separator || ""}${value2}`, }; }); case "SPLIT": return rows.map((row) => { const value = row[sourceField]?.toString() || ""; const parts = value.split(delimiter || ","); const index = splitIndex !== undefined ? splitIndex : 0; return { ...row, [actualTargetField]: parts[index] || "", }; }); case "REPLACE": return rows.map((row) => { const value = row[sourceField]?.toString() || ""; const replaced = value.replace( new RegExp(searchValue || "", "g"), replaceValue || "" ); return { ...row, [actualTargetField]: replaced, }; }); case "CAST": return rows.map((row) => { const value = row[sourceField]; let castedValue = value; switch (castType) { case "string": castedValue = value?.toString() || ""; break; case "number": castedValue = parseFloat(value) || 0; break; case "boolean": castedValue = Boolean(value); break; case "date": castedValue = new Date(value); break; } return { ...row, [actualTargetField]: castedValue, }; }); case "FORMAT": case "CALCULATE": case "JSON_EXTRACT": case "CUSTOM": // 표현식 기반 변환 (현재는 단순 구현) logger.warn(`⚠️ ${type} 변환은 아직 완전히 구현되지 않았습니다.`); return rows.map((row) => ({ ...row, [actualTargetField]: row[sourceField] || "", })); default: logger.warn(`⚠️ 지원하지 않는 변환 타입: ${type}`); return rows; } } /** * EXPLODE 변환: 1개 행을 N개 행으로 확장 */ private static applyExplode( rows: any[], sourceField: string, targetField: string, delimiter: string ): any[] { const expandedRows: any[] = []; for (const row of rows) { const value = row[sourceField]; if (!value) { // 값이 없으면 원본 행 유지 expandedRows.push(row); continue; } // 문자열을 구분자로 분리 const values = value .toString() .split(delimiter) .map((v: string) => v.trim()); // 각 값마다 새 행 생성 for (const val of values) { expandedRows.push({ ...row, // 다른 필드들은 복제 [targetField]: val, // 타겟 필드에 분리된 값 저장 }); } } logger.info( ` 💥 EXPLODE: ${rows.length}개 행 → ${expandedRows.length}개 행` ); return expandedRows; } }