ERP-node/backend-node/src/services/nodeFlowExecutionService.ts

2405 lines
69 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* 노드 플로우 실행 엔진
*
* 기능:
* - 위상 정렬 (Topological Sort)
* - 레벨별 병렬 실행 (Promise.allSettled)
* - 독립 트랜잭션 처리
* - 연쇄 중단 (부모 실패 시 자식 스킵)
*/
import { query, queryOne, transaction } from "../database/db";
import { logger } from "../utils/logger";
import axios from "axios";
// ===== 타입 정의 =====
export interface FlowNode {
id: string;
type: NodeType;
position?: { x: number; y: number };
data: NodeData;
}
export type NodeType =
| "tableSource"
| "externalDBSource"
| "restAPISource"
| "condition"
| "fieldMapping"
| "dataTransform"
| "insertAction"
| "updateAction"
| "deleteAction"
| "upsertAction"
| "comment"
| "log";
export interface NodeData {
displayName?: string;
[key: string]: any;
}
export interface FlowEdge {
id: string;
source: string;
target: string;
sourceHandle?: string;
targetHandle?: string;
}
export interface ExecutionContext {
sourceData?: any[]; // 외부에서 주입된 데이터 (선택된 행 또는 폼 데이터)
dataSourceType?: string; // "table-selection" | "form" | "none"
nodeResults: Map<string, NodeResult>;
executionOrder: string[];
buttonContext?: ButtonContext;
}
export interface ButtonContext {
buttonId: string;
screenId?: number;
companyCode?: string;
userId?: string;
formData?: Record<string, any>;
selectedRowsData?: Record<string, any>[];
}
export interface NodeResult {
nodeId: string;
status: "pending" | "success" | "failed" | "skipped";
data?: any;
error?: Error;
startTime: number;
endTime?: number;
}
export interface ExecutionResult {
success: boolean;
message: string;
executionTime: number;
nodes: NodeExecutionSummary[];
summary: {
total: number;
success: number;
failed: number;
skipped: number;
};
}
export interface NodeExecutionSummary {
nodeId: string;
nodeName: string;
nodeType: NodeType;
status: "success" | "failed" | "skipped" | "pending";
duration?: number;
error?: string;
}
// ===== 메인 실행 서비스 =====
export class NodeFlowExecutionService {
/**
* 플로우 실행 메인 함수
*/
static async executeFlow(
flowId: number,
contextData: Record<string, any>
): Promise<ExecutionResult> {
const startTime = Date.now();
try {
logger.info(`🚀 플로우 실행 시작: flowId=${flowId}`);
// 1. 플로우 데이터 조회
const flow = await queryOne<{
flow_id: number;
flow_name: string;
flow_data: any;
}>(
`SELECT flow_id, flow_name, flow_data FROM node_flows WHERE flow_id = $1`,
[flowId]
);
if (!flow) {
throw new Error(`플로우를 찾을 수 없습니다: flowId=${flowId}`);
}
const flowData =
typeof flow.flow_data === "string"
? JSON.parse(flow.flow_data)
: flow.flow_data;
const { nodes, edges } = flowData;
logger.info(`📊 플로우 정보:`, {
flowName: flow.flow_name,
nodeCount: nodes.length,
edgeCount: edges.length,
});
// 2. 실행 컨텍스트 준비
const context: ExecutionContext = {
sourceData: contextData.sourceData || [],
dataSourceType: contextData.dataSourceType || "none",
nodeResults: new Map(),
executionOrder: [],
buttonContext: {
buttonId:
contextData.buttonId || contextData.context?.buttonId || "unknown",
screenId: contextData.screenId || contextData.context?.screenId,
companyCode:
contextData.companyCode || contextData.context?.companyCode,
userId: contextData.userId || contextData.context?.userId,
formData: contextData.formData || contextData.context?.formData,
selectedRowsData:
contextData.selectedRowsData ||
contextData.context?.selectedRowsData,
},
};
logger.info(`📦 실행 컨텍스트:`, {
dataSourceType: context.dataSourceType,
sourceDataCount: context.sourceData?.length || 0,
buttonContext: context.buttonContext,
});
// 3. 위상 정렬
const levels = this.topologicalSort(nodes, edges);
logger.info(`📋 실행 순서 (레벨별):`, levels);
// 4. 레벨별 실행
for (const level of levels) {
await this.executeLevel(level, nodes, edges, context);
}
// 5. 결과 생성
const executionTime = Date.now() - startTime;
const result = this.generateExecutionResult(
nodes,
context,
executionTime
);
logger.info(`✅ 플로우 실행 완료:`, result.summary);
return result;
} catch (error) {
logger.error(`❌ 플로우 실행 실패:`, error);
throw error;
}
}
/**
* 소스 데이터 준비
*/
private static prepareSourceData(contextData: Record<string, any>): any[] {
const { controlDataSource, formData, selectedRowsData } = contextData;
switch (controlDataSource) {
case "form":
return formData ? [formData] : [];
case "table-selection":
return selectedRowsData || [];
case "both":
return [
{ source: "form", data: formData },
{ source: "table", data: selectedRowsData },
];
default:
return formData ? [formData] : [];
}
}
/**
* 위상 정렬 (Topological Sort)
* DAG(Directed Acyclic Graph)를 레벨별로 그룹화
*/
private static topologicalSort(
nodes: FlowNode[],
edges: FlowEdge[]
): string[][] {
const levels: string[][] = [];
const inDegree = new Map<string, number>();
const adjacency = new Map<string, string[]>();
// 초기화
nodes.forEach((node) => {
inDegree.set(node.id, 0);
adjacency.set(node.id, []);
});
// 진입 차수 계산
edges.forEach((edge) => {
inDegree.set(edge.target, (inDegree.get(edge.target) || 0) + 1);
adjacency.get(edge.source)?.push(edge.target);
});
// 레벨별 분류
let currentLevel = nodes
.filter((node) => inDegree.get(node.id) === 0)
.map((node) => node.id);
while (currentLevel.length > 0) {
levels.push([...currentLevel]);
const nextLevel: string[] = [];
currentLevel.forEach((nodeId) => {
const neighbors = adjacency.get(nodeId) || [];
neighbors.forEach((neighbor) => {
const newDegree = (inDegree.get(neighbor) || 1) - 1;
inDegree.set(neighbor, newDegree);
if (newDegree === 0) {
nextLevel.push(neighbor);
}
});
});
currentLevel = nextLevel;
}
return levels;
}
/**
* 레벨 내 노드 병렬 실행
*/
private static async executeLevel(
nodeIds: string[],
nodes: FlowNode[],
edges: FlowEdge[],
context: ExecutionContext
): Promise<void> {
logger.info(`⏳ 레벨 실행 시작: ${nodeIds.length}개 노드`);
// Promise.allSettled로 병렬 실행
const results = await Promise.allSettled(
nodeIds.map((nodeId) => this.executeNode(nodeId, nodes, edges, context))
);
// 결과 저장
results.forEach((result, index) => {
const nodeId = nodeIds[index];
if (result.status === "fulfilled") {
context.nodeResults.set(nodeId, result.value);
context.executionOrder.push(nodeId);
} else {
context.nodeResults.set(nodeId, {
nodeId,
status: "failed",
error: result.reason,
startTime: Date.now(),
endTime: Date.now(),
});
}
});
logger.info(`✅ 레벨 실행 완료`);
}
/**
* 개별 노드 실행
*/
private static async executeNode(
nodeId: string,
nodes: FlowNode[],
edges: FlowEdge[],
context: ExecutionContext
): Promise<NodeResult> {
const startTime = Date.now();
const node = nodes.find((n) => n.id === nodeId);
if (!node) {
throw new Error(`노드를 찾을 수 없습니다: ${nodeId}`);
}
logger.info(`🔄 노드 실행 시작: ${nodeId} (${node.type})`);
// 1. 부모 노드 상태 확인 (연쇄 중단)
const parents = this.getParentNodes(nodeId, edges);
const parentFailed = parents.some((parentId) => {
const parentResult = context.nodeResults.get(parentId);
return parentResult?.status === "failed";
});
if (parentFailed) {
logger.warn(`⏭️ 노드 스킵 (부모 실패): ${nodeId}`);
return {
nodeId,
status: "skipped",
error: new Error("Parent node failed"),
startTime,
endTime: Date.now(),
};
}
// 2. 입력 데이터 준비
const inputData = this.prepareInputData(nodeId, parents, edges, context);
// 3. 노드 타입별 실행
try {
const result = await this.executeNodeByType(node, inputData, context);
logger.info(`✅ 노드 실행 성공: ${nodeId}`);
return {
nodeId,
status: "success",
data: result,
startTime,
endTime: Date.now(),
};
} catch (error) {
logger.error(`❌ 노드 실행 실패: ${nodeId}`, error);
return {
nodeId,
status: "failed",
error: error as Error,
startTime,
endTime: Date.now(),
};
}
}
/**
* 부모 노드 목록 조회
*/
private static getParentNodes(nodeId: string, edges: FlowEdge[]): string[] {
return edges
.filter((edge) => edge.target === nodeId)
.map((edge) => edge.source);
}
/**
* 입력 데이터 준비
*/
private static prepareInputData(
nodeId: string,
parents: string[],
edges: FlowEdge[],
context: ExecutionContext
): any {
if (parents.length === 0) {
// 소스 노드: 원본 데이터 사용
return context.sourceData;
} else if (parents.length === 1) {
// 단일 부모: 부모의 결과 데이터 전달
const parentResult = context.nodeResults.get(parents[0]);
return parentResult?.data || context.sourceData;
} else {
// 다중 부모: 모든 부모의 데이터 병합
return parents.map((parentId) => {
const result = context.nodeResults.get(parentId);
return result?.data || context.sourceData;
});
}
}
/**
* 노드 타입별 실행 로직
*/
private static async executeNodeByType(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
switch (node.type) {
case "tableSource":
return this.executeTableSource(node, context);
case "restAPISource":
return this.executeRestAPISource(node, context);
case "dataTransform":
return this.executeDataTransform(node, inputData, context);
case "insertAction":
return this.executeInsertAction(node, inputData, context);
case "updateAction":
return this.executeUpdateAction(node, inputData, context);
case "deleteAction":
return this.executeDeleteAction(node, inputData, context);
case "upsertAction":
return this.executeUpsertAction(node, inputData, context);
case "condition":
return this.executeCondition(node, inputData, context);
case "comment":
case "log":
// 로그/코멘트는 실행 없이 통과
logger.info(`📝 ${node.type}: ${node.data.displayName || node.id}`);
return { message: "Logged" };
default:
logger.warn(`⚠️ 지원하지 않는 노드 타입: ${node.type}`);
return { message: "Unsupported node type" };
}
}
/**
* REST API 소스 노드 실행
*/
private static async executeRestAPISource(
node: FlowNode,
context: ExecutionContext
): Promise<any[]> {
const {
url,
method = "GET",
headers = {},
body,
timeout = 30000,
responseMapping,
authentication,
} = node.data;
if (!url) {
throw new Error("REST API URL이 설정되지 않았습니다.");
}
logger.info(`🌐 REST API 호출: ${method} ${url}`);
try {
// 헤더 설정
const requestHeaders: any = { ...headers };
// 인증 헤더 추가
if (authentication) {
if (authentication.type === "bearer" && authentication.token) {
requestHeaders["Authorization"] = `Bearer ${authentication.token}`;
} else if (
authentication.type === "basic" &&
authentication.username &&
authentication.password
) {
const credentials = Buffer.from(
`${authentication.username}:${authentication.password}`
).toString("base64");
requestHeaders["Authorization"] = `Basic ${credentials}`;
} else if (authentication.type === "apikey" && authentication.token) {
const headerName = authentication.apiKeyHeader || "X-API-Key";
requestHeaders[headerName] = authentication.token;
}
}
if (!requestHeaders["Content-Type"]) {
requestHeaders["Content-Type"] = "application/json";
}
// API 호출
const response = await axios({
method: method.toLowerCase(),
url,
headers: requestHeaders,
data: body,
timeout,
});
logger.info(`✅ REST API 응답 수신: ${response.status}`);
let responseData = response.data;
// 🔥 표준 API 응답 형식 자동 감지 { success, message, data }
if (
!responseMapping &&
responseData &&
typeof responseData === "object" &&
"success" in responseData &&
"data" in responseData
) {
logger.info("🔍 표준 API 응답 형식 감지, data 속성 자동 추출");
responseData = responseData.data;
}
// responseMapping이 있으면 해당 경로의 데이터 추출
if (responseMapping && responseData) {
logger.info(`🔍 응답 매핑 적용: ${responseMapping}`);
const path = responseMapping.split(".");
for (const key of path) {
if (
responseData &&
typeof responseData === "object" &&
key in responseData
) {
responseData = responseData[key];
} else {
logger.warn(
`⚠️ 응답 매핑 경로를 찾을 수 없습니다: ${responseMapping}`
);
break;
}
}
}
// 배열이 아니면 배열로 변환
if (!Array.isArray(responseData)) {
logger.info("🔄 단일 객체를 배열로 변환");
responseData = [responseData];
}
logger.info(`📦 REST API 데이터 ${responseData.length}건 반환`);
// 첫 번째 데이터 샘플 상세 로깅
if (responseData.length > 0) {
console.log("🔍 REST API 응답 데이터 샘플 (첫 번째 항목):");
console.log(JSON.stringify(responseData[0], null, 2));
console.log("🔑 사용 가능한 필드명:", Object.keys(responseData[0]));
}
return responseData;
} catch (error: any) {
logger.error(`❌ REST API 호출 실패:`, error.message);
throw new Error(`REST API 호출 실패: ${error.message}`);
}
}
/**
* 테이블 소스 노드 실행
*/
private static async executeTableSource(
node: FlowNode,
context: ExecutionContext
): Promise<any[]> {
// 🔥 외부에서 주입된 데이터가 있으면 우선 사용
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
logger.info(
`📊 외부 주입 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}`
);
return context.sourceData;
}
// 외부 데이터가 없으면 DB 쿼리 실행
const { tableName, schema, whereConditions } = node.data;
if (!tableName) {
logger.warn(
"⚠️ 테이블 소스 노드에 테이블명이 없고, 외부 데이터도 없습니다."
);
return [];
}
const schemaPrefix = schema ? `${schema}.` : "";
const whereClause = whereConditions
? `WHERE ${this.buildWhereClause(whereConditions)}`
: "";
const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereClause}`;
const result = await query(sql, []);
logger.info(`📊 테이블 소스 조회: ${tableName}, ${result.length}`);
return result;
}
/**
* INSERT 액션 노드 실행
*/
private static async executeInsertAction(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalInsert(node, inputData, context);
case "external":
return this.executeExternalInsert(node, inputData, context);
case "api":
return this.executeApiInsert(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalInsert(node, inputData, context);
}
}
/**
* 내부 DB INSERT 실행
*/
private static async executeInternalInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { targetTable, fieldMappings } = node.data;
logger.info(`💾 INSERT 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
return transaction(async (client) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
for (const data of dataArray) {
const fields: string[] = [];
const values: any[] = [];
console.log("🗺️ 필드 매핑 처리 중...");
fieldMappings.forEach((mapping: any) => {
fields.push(mapping.targetField);
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
console.log(
` ${mapping.sourceField}${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
);
values.push(value);
});
const sql = `
INSERT INTO ${targetTable} (${fields.join(", ")})
VALUES (${fields.map((_, i) => `$${i + 1}`).join(", ")})
`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", values);
await client.query(sql, values);
insertedCount++;
}
logger.info(
`✅ INSERT 완료 (내부 DB): ${targetTable}, ${insertedCount}`
);
return { insertedCount };
});
}
/**
* 외부 DB INSERT 실행
*/
private static async executeExternalInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB INSERT 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
// 🔥 Oracle의 경우 autoCommit을 false로 설정하여 트랜잭션 제어
const isOracle = externalDbType.toLowerCase() === "oracle";
for (const data of dataArray) {
const fields: string[] = [];
const values: any[] = [];
fieldMappings.forEach((mapping: any) => {
fields.push(mapping.targetField);
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
values.push(value);
});
// 외부 DB별 SQL 문법 차이 처리
let sql: string;
let params: any[];
if (isOracle) {
// Oracle: :1, :2, ... 형식
const placeholders = fields.map((_, i) => `:${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
// MySQL/MariaDB: ? 형식
const placeholders = fields.map(() => "?").join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else if (externalDbType.toLowerCase() === "mssql") {
// MSSQL: @p1, @p2, ... 형식
const placeholders = fields.map((_, i) => `@p${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else {
// PostgreSQL: $1, $2, ... 형식 (기본)
const placeholders = fields.map((_, i) => `$${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
}
await connector.executeQuery(sql, params);
insertedCount++;
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
insertedCount
);
logger.info(
`✅ INSERT 완료 (외부 DB): ${externalTargetTable}, ${insertedCount}`
);
return { insertedCount };
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
// 연결 해제
await connector.disconnect();
}
}
/**
* REST API INSERT 실행 (POST 요청)
*/
private static async executeApiInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API INSERT 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
// Content-Type 기본값 설정
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성 (템플릿 또는 필드 매핑)
let body: any;
if (apiBodyTemplate) {
// 템플릿 변수 치환
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
// 필드 매핑 사용
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
// 전체 데이터 전송
body = data;
}
try {
const response = await axios({
method: apiMethod || "POST",
url: apiEndpoint,
headers,
data: body,
timeout: 30000, // 30초 타임아웃
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API INSERT 완료: ${results.length}`);
return { results };
}
/**
* 템플릿 변수 치환 ({{variable}} 형식)
*/
private static replaceTemplateVariables(template: string, data: any): string {
let result = template;
// {{variable}} 형식의 변수를 찾아서 치환
const matches = template.match(/\{\{([^}]+)\}\}/g);
if (matches) {
matches.forEach((match) => {
const key = match.replace(/\{\{|\}\}/g, "").trim();
const value = this.getNestedValue(data, key);
result = result.replace(match, value !== undefined ? value : "");
});
}
return result;
}
/**
* 중첩된 객체 값 가져오기 (예: "user.name")
*/
private static getNestedValue(obj: any, path: string): any {
return path.split(".").reduce((current, key) => current?.[key], obj);
}
/**
* 외부 DB 커넥터 생성 (공통 로직)
*/
private static async createExternalConnector(
connectionId: number,
dbType: string
): Promise<any> {
// 외부 DB 커넥션 정보 조회
const connectionData: any = await queryOne(
"SELECT * FROM external_db_connections WHERE id = $1",
[connectionId]
);
if (!connectionData) {
throw new Error(`외부 DB 커넥션을 찾을 수 없습니다: ${connectionId}`);
}
// 패스워드 복호화
const { EncryptUtil } = await import("../utils/encryptUtil");
const decryptedPassword = EncryptUtil.decrypt(connectionData.password);
const config = {
host: connectionData.host,
port: connectionData.port,
database: connectionData.database_name,
user: connectionData.username,
password: decryptedPassword,
};
// DatabaseConnectorFactory를 사용하여 외부 DB 연결
const { DatabaseConnectorFactory } = await import(
"../database/DatabaseConnectorFactory"
);
return await DatabaseConnectorFactory.createConnector(
dbType,
config,
connectionId
);
}
/**
* 외부 DB 트랜잭션 커밋 (Oracle 전용)
*/
private static async commitExternalTransaction(
connector: any,
dbType: string,
count: number
): Promise<void> {
if (dbType.toLowerCase() === "oracle" && count > 0) {
await connector.executeQuery("COMMIT");
logger.info(`✅ Oracle COMMIT 실행: ${count}`);
}
}
/**
* 외부 DB 트랜잭션 롤백 (Oracle 전용)
*/
private static async rollbackExternalTransaction(
connector: any,
dbType: string
): Promise<void> {
if (dbType.toLowerCase() === "oracle") {
try {
await connector.executeQuery("ROLLBACK");
logger.info(`⚠️ Oracle ROLLBACK 실행 (오류 발생)`);
} catch (rollbackError) {
logger.error(`❌ Oracle ROLLBACK 실패:`, rollbackError);
}
}
}
/**
* UPDATE 액션 노드 실행
*/
private static async executeUpdateAction(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalUpdate(node, inputData, context);
case "external":
return this.executeExternalUpdate(node, inputData, context);
case "api":
return this.executeApiUpdate(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalUpdate(node, inputData, context);
}
}
/**
* 내부 DB UPDATE 실행
*/
private static async executeInternalUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { targetTable, fieldMappings, whereConditions } = node.data;
logger.info(`🔄 UPDATE 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
return transaction(async (client) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let updatedCount = 0;
for (const data of dataArray) {
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
console.log("🗺️ 필드 매핑 처리 중...");
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
console.log(
` ${mapping.sourceField}${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
);
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
values.push(value);
paramIndex++;
});
const whereClause = this.buildWhereClause(
whereConditions,
data,
paramIndex
);
const sql = `
UPDATE ${targetTable}
SET ${setClauses.join(", ")}
${whereClause}
`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", values);
const result = await client.query(sql, values);
updatedCount += result.rowCount || 0;
}
logger.info(
`✅ UPDATE 완료 (내부 DB): ${targetTable}, ${updatedCount}`
);
return { updatedCount };
});
}
/**
* 외부 DB UPDATE 실행
*/
private static async executeExternalUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
whereConditions,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB UPDATE 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let updatedCount = 0;
for (const data of dataArray) {
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
// DB별 플레이스홀더
if (externalDbType.toLowerCase() === "oracle") {
setClauses.push(`${mapping.targetField} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
setClauses.push(`${mapping.targetField} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
setClauses.push(`${mapping.targetField} = @p${paramIndex}`);
} else {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
}
values.push(value);
paramIndex++;
});
// WHERE 조건 생성
const whereClauses: string[] = [];
whereConditions?.forEach((condition: any) => {
const condValue = data[condition.field];
if (condition.operator === "IS NULL") {
whereClauses.push(`${condition.field} IS NULL`);
} else if (condition.operator === "IS NOT NULL") {
whereClauses.push(`${condition.field} IS NOT NULL`);
} else {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(
`${condition.field} ${condition.operator} :${paramIndex}`
);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${condition.field} ${condition.operator} ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(
`${condition.field} ${condition.operator} @p${paramIndex}`
);
} else {
whereClauses.push(
`${condition.field} ${condition.operator} $${paramIndex}`
);
}
values.push(condValue);
paramIndex++;
}
});
const whereClause =
whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : "";
const sql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} ${whereClause}`;
const result = await connector.executeQuery(sql, values);
updatedCount += result.rowCount || result.affectedRows || 0;
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
updatedCount
);
logger.info(
`✅ UPDATE 완료 (외부 DB): ${externalTargetTable}, ${updatedCount}`
);
return { updatedCount };
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API UPDATE 실행 (PUT/PATCH 요청)
*/
private static async executeApiUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API UPDATE 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성
let body: any;
if (apiBodyTemplate) {
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
body = data;
}
try {
const response = await axios({
method: apiMethod || "PUT",
url: apiEndpoint,
headers,
data: body,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API UPDATE 완료: ${results.length}`);
return { results };
}
/**
* DELETE 액션 노드 실행
*/
private static async executeDeleteAction(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalDelete(node, inputData, context);
case "external":
return this.executeExternalDelete(node, inputData, context);
case "api":
return this.executeApiDelete(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalDelete(node, inputData, context);
}
}
/**
* 내부 DB DELETE 실행
*/
private static async executeInternalDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { targetTable, whereConditions } = node.data;
logger.info(`🗑️ DELETE 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
return transaction(async (client) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let deletedCount = 0;
for (const data of dataArray) {
console.log("🔍 WHERE 조건 처리 중...");
const whereClause = this.buildWhereClause(whereConditions, data, 1);
const sql = `DELETE FROM ${targetTable} ${whereClause}`;
console.log("📝 실행할 SQL:", sql);
const result = await client.query(sql, []);
deletedCount += result.rowCount || 0;
}
logger.info(
`✅ DELETE 완료 (내부 DB): ${targetTable}, ${deletedCount}`
);
return { deletedCount };
});
}
/**
* 외부 DB DELETE 실행
*/
private static async executeExternalDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
whereConditions,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB DELETE 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let deletedCount = 0;
for (const data of dataArray) {
const whereClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// WHERE 조건 생성
whereConditions?.forEach((condition: any) => {
const condValue = data[condition.field];
if (condition.operator === "IS NULL") {
whereClauses.push(`${condition.field} IS NULL`);
} else if (condition.operator === "IS NOT NULL") {
whereClauses.push(`${condition.field} IS NOT NULL`);
} else {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(
`${condition.field} ${condition.operator} :${paramIndex}`
);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${condition.field} ${condition.operator} ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(
`${condition.field} ${condition.operator} @p${paramIndex}`
);
} else {
whereClauses.push(
`${condition.field} ${condition.operator} $${paramIndex}`
);
}
values.push(condValue);
paramIndex++;
}
});
const whereClause =
whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : "";
if (!whereClause) {
throw new Error(
"DELETE 작업에 WHERE 조건이 필요합니다. (전체 삭제 방지)"
);
}
const sql = `DELETE FROM ${externalTargetTable} ${whereClause}`;
const result = await connector.executeQuery(sql, values);
deletedCount += result.rowCount || result.affectedRows || 0;
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
deletedCount
);
logger.info(
`✅ DELETE 완료 (외부 DB): ${externalTargetTable}, ${deletedCount}`
);
return { deletedCount };
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API DELETE 실행 (DELETE 요청)
*/
private static async executeApiDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { apiEndpoint, apiAuthType, apiAuthConfig, apiHeaders } = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API DELETE 시작: ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
// DELETE는 일반적으로 URL 파라미터 또는 경로에 ID 포함
// 템플릿 변수 치환 지원 (예: /api/users/{{id}})
const url = this.replaceTemplateVariables(apiEndpoint, data);
try {
const response = await axios({
method: "DELETE",
url,
headers,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API DELETE 완료: ${results.length}`);
return { results };
}
/**
* UPSERT 액션 노드 실행
*/
private static async executeUpsertAction(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalUpsert(node, inputData, context);
case "external":
return this.executeExternalUpsert(node, inputData, context);
case "api":
return this.executeApiUpsert(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalUpsert(node, inputData, context);
}
}
/**
* 내부 DB UPSERT 실행 (로직 기반)
* DB 제약 조건 없이 SELECT → UPDATE or INSERT 방식으로 구현
*/
private static async executeInternalUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { targetTable, fieldMappings, conflictKeys } = node.data;
if (!targetTable || !fieldMappings || fieldMappings.length === 0) {
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
}
if (!conflictKeys || conflictKeys.length === 0) {
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
}
logger.info(`🔀 UPSERT 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
console.log("🔑 충돌 키:", conflictKeys);
return transaction(async (client) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
let updatedCount = 0;
for (const data of dataArray) {
// 1. 충돌 키 값 추출
const conflictKeyValues: Record<string, any> = {};
conflictKeys.forEach((key: string) => {
const mapping = fieldMappings.find((m: any) => m.targetField === key);
if (mapping) {
conflictKeyValues[key] =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
}
});
// 2. 존재 여부 확인 (SELECT)
const whereConditions = conflictKeys
.map((key: string, index: number) => `${key} = $${index + 1}`)
.join(" AND ");
const whereValues = conflictKeys.map(
(key: string) => conflictKeyValues[key]
);
console.log("🔍 존재 여부 확인 - WHERE 조건:", whereConditions);
console.log("🔍 존재 여부 확인 - 바인딩 값:", whereValues);
const checkSql = `SELECT 1 FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`;
const existingRow = await client.query(checkSql, whereValues);
if (existingRow.rows.length > 0) {
// 3-A. 존재하면 UPDATE
const setClauses: string[] = [];
const updateValues: any[] = [];
let paramIndex = 1;
fieldMappings.forEach((mapping: any) => {
// 충돌 키가 아닌 필드만 UPDATE
if (!conflictKeys.includes(mapping.targetField)) {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
updateValues.push(value);
paramIndex++;
}
});
// WHERE 조건 생성 (파라미터 인덱스 이어서)
const updateWhereConditions = conflictKeys
.map(
(key: string, index: number) => `${key} = $${paramIndex + index}`
)
.join(" AND ");
// WHERE 조건 값 추가
whereValues.forEach((val: any) => {
updateValues.push(val);
});
const updateSql = `
UPDATE ${targetTable}
SET ${setClauses.join(", ")}
WHERE ${updateWhereConditions}
`;
logger.info(`🔄 UPDATE 실행:`, {
table: targetTable,
conflictKeys,
conflictKeyValues,
sql: updateSql,
values: updateValues,
});
await client.query(updateSql, updateValues);
updatedCount++;
} else {
// 3-B. 없으면 INSERT
const columns: string[] = [];
const values: any[] = [];
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
columns.push(mapping.targetField);
values.push(value);
});
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
const insertSql = `
INSERT INTO ${targetTable} (${columns.join(", ")})
VALUES (${placeholders})
`;
logger.info(` INSERT 실행:`, {
table: targetTable,
conflictKeys,
conflictKeyValues,
});
await client.query(insertSql, values);
insertedCount++;
}
}
logger.info(
`✅ UPSERT 완료 (내부 DB): ${targetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}`
);
return {
insertedCount,
updatedCount,
totalCount: insertedCount + updatedCount,
};
});
}
/**
* 외부 DB UPSERT 실행 (로직 기반)
*/
private static async executeExternalUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
conflictKeys,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
if (!fieldMappings || fieldMappings.length === 0) {
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
}
if (!conflictKeys || conflictKeys.length === 0) {
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
}
logger.info(
`🔌 외부 DB UPSERT 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
let updatedCount = 0;
for (const data of dataArray) {
// 1. 충돌 키 값 추출
const conflictKeyValues: Record<string, any> = {};
conflictKeys.forEach((key: string) => {
const mapping = fieldMappings.find((m: any) => m.targetField === key);
if (mapping) {
conflictKeyValues[key] =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
}
});
// 2. 존재 여부 확인 (SELECT)
const whereClauses: string[] = [];
const whereValues: any[] = [];
let paramIndex = 1;
conflictKeys.forEach((key: string) => {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(`${key} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${key} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(`${key} = @p${paramIndex}`);
} else {
whereClauses.push(`${key} = $${paramIndex}`);
}
whereValues.push(conflictKeyValues[key]);
paramIndex++;
});
const checkSql = `SELECT * FROM ${externalTargetTable} WHERE ${whereClauses.join(" AND ")} LIMIT 1`;
const existingRow = await connector.executeQuery(checkSql, whereValues);
const hasExistingRow =
existingRow.rows?.length > 0 || existingRow.length > 0;
if (hasExistingRow) {
// 3-A. 존재하면 UPDATE
const setClauses: string[] = [];
const updateValues: any[] = [];
paramIndex = 1;
fieldMappings.forEach((mapping: any) => {
if (!conflictKeys.includes(mapping.targetField)) {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
if (externalDbType.toLowerCase() === "oracle") {
setClauses.push(`${mapping.targetField} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
setClauses.push(`${mapping.targetField} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
setClauses.push(`${mapping.targetField} = @p${paramIndex}`);
} else {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
}
updateValues.push(value);
paramIndex++;
}
});
// WHERE 조건 생성
const updateWhereClauses: string[] = [];
conflictKeys.forEach((key: string) => {
if (externalDbType.toLowerCase() === "oracle") {
updateWhereClauses.push(`${key} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
updateWhereClauses.push(`${key} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
updateWhereClauses.push(`${key} = @p${paramIndex}`);
} else {
updateWhereClauses.push(`${key} = $${paramIndex}`);
}
updateValues.push(conflictKeyValues[key]);
paramIndex++;
});
const updateSql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} WHERE ${updateWhereClauses.join(" AND ")}`;
await connector.executeQuery(updateSql, updateValues);
updatedCount++;
} else {
// 3-B. 없으면 INSERT
const columns: string[] = [];
const values: any[] = [];
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
columns.push(mapping.targetField);
values.push(value);
});
let insertSql: string;
if (externalDbType.toLowerCase() === "oracle") {
const placeholders = columns.map((_, i) => `:${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
const placeholders = columns.map(() => "?").join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else if (externalDbType.toLowerCase() === "mssql") {
const placeholders = columns.map((_, i) => `@p${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else {
const placeholders = columns.map((_, i) => `$${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
}
await connector.executeQuery(insertSql, values);
insertedCount++;
}
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
insertedCount + updatedCount
);
logger.info(
`✅ UPSERT 완료 (외부 DB): ${externalTargetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}`
);
return {
insertedCount,
updatedCount,
totalCount: insertedCount + updatedCount,
};
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API UPSERT 실행 (POST/PUT 요청)
* API 응답에 따라 INSERT/UPDATE 판단
*/
private static async executeApiUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API UPSERT 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성
let body: any;
if (apiBodyTemplate) {
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
body = data;
}
try {
// UPSERT는 일반적으로 PUT 메서드 사용 (멱등성)
const response = await axios({
method: apiMethod || "PUT",
url: apiEndpoint,
headers,
data: body,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API UPSERT 완료: ${results.length}`);
return { results };
}
/**
* 조건 노드 실행
*/
private static async executeCondition(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<boolean> {
const { conditions, logic } = node.data;
const results = conditions.map((condition: any) => {
const fieldValue = inputData[condition.field];
// 🔥 비교 값 타입 확인: "field" (필드 참조) 또는 "static" (고정값)
let compareValue = condition.value;
if (condition.valueType === "field") {
// 필드 참조: inputData에서 해당 필드의 값을 가져옴
compareValue = inputData[condition.value];
logger.info(
`🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})`
);
} else {
logger.info(
`📊 고정값 비교: ${condition.field} (${fieldValue}) vs ${compareValue}`
);
}
return this.evaluateCondition(
fieldValue,
condition.operator,
compareValue
);
});
const result =
logic === "OR"
? results.some((r: boolean) => r)
: results.every((r: boolean) => r);
logger.info(`🔍 조건 평가 결과: ${result} (${logic} 로직)`);
return result;
}
/**
* WHERE 절 생성
*/
private static buildWhereClause(
conditions: any[],
data?: any,
startIndex: number = 1
): string {
if (!conditions || conditions.length === 0) {
return "";
}
const clauses = conditions.map((condition, index) => {
const value = data ? data[condition.field] : condition.value;
return `${condition.field} ${condition.operator} $${startIndex + index}`;
});
return `WHERE ${clauses.join(" AND ")}`;
}
/**
* 조건 평가
*/
private static evaluateCondition(
fieldValue: any,
operator: string,
expectedValue: any
): boolean {
switch (operator) {
case "equals":
case "=":
return fieldValue === expectedValue;
case "notEquals":
case "!=":
return fieldValue !== expectedValue;
case "greaterThan":
case ">":
return fieldValue > expectedValue;
case "lessThan":
case "<":
return fieldValue < expectedValue;
case "contains":
return String(fieldValue).includes(String(expectedValue));
default:
return false;
}
}
/**
* 실행 결과 생성
*/
private static generateExecutionResult(
nodes: FlowNode[],
context: ExecutionContext,
executionTime: number
): ExecutionResult {
const nodeSummaries: NodeExecutionSummary[] = nodes.map((node) => {
const result = context.nodeResults.get(node.id);
return {
nodeId: node.id,
nodeName: node.data.displayName || node.id,
nodeType: node.type,
status: result?.status || "pending",
duration: result?.endTime
? result.endTime - result.startTime
: undefined,
error: result?.error?.message,
};
});
const summary = {
total: nodes.length,
success: nodeSummaries.filter((n) => n.status === "success").length,
failed: nodeSummaries.filter((n) => n.status === "failed").length,
skipped: nodeSummaries.filter((n) => n.status === "skipped").length,
};
const success = summary.failed === 0;
// 실패한 노드 상세 로깅
if (!success) {
const failedNodes = nodeSummaries.filter((n) => n.status === "failed");
logger.error(
`❌ 실패한 노드들:`,
failedNodes.map((n) => ({
nodeId: n.nodeId,
nodeName: n.nodeName,
nodeType: n.nodeType,
error: n.error,
}))
);
}
return {
success,
message: success
? `플로우 실행 성공 (${summary.success}/${summary.total})`
: `플로우 실행 부분 실패 (성공: ${summary.success}, 실패: ${summary.failed})`,
executionTime,
nodes: nodeSummaries,
summary,
};
}
/**
* 데이터 변환 노드 실행
*/
private static async executeDataTransform(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any[]> {
const { transformations } = node.data;
if (
!transformations ||
!Array.isArray(transformations) ||
transformations.length === 0
) {
logger.warn(`⚠️ 데이터 변환 노드에 변환 규칙이 없습니다: ${node.id}`);
return Array.isArray(inputData) ? inputData : [inputData];
}
// inputData를 배열로 정규화
const rows = Array.isArray(inputData) ? inputData : [inputData];
logger.info(
`🔄 데이터 변환 시작: ${rows.length}개 행, ${transformations.length}개 변환`
);
// 각 변환 규칙을 순차적으로 적용
let transformedRows = rows;
for (const transform of transformations) {
const transformType = transform.type;
logger.info(` 🔹 변환 적용: ${transformType}`);
transformedRows = this.applyTransformation(transformedRows, transform);
}
logger.info(`✅ 데이터 변환 완료: ${transformedRows.length}개 행`);
return transformedRows;
}
/**
* 단일 변환 규칙 적용
*/
private static applyTransformation(rows: any[], transform: any): any[] {
const {
type,
sourceField,
targetField,
delimiter,
separator,
searchValue,
replaceValue,
splitIndex,
castType,
expression,
} = transform;
// 타겟 필드 결정 (비어있으면 소스 필드 사용 = in-place)
const actualTargetField = targetField || sourceField;
switch (type) {
case "UPPERCASE":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().toUpperCase() || row[sourceField],
}));
case "LOWERCASE":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().toLowerCase() || row[sourceField],
}));
case "TRIM":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().trim() || row[sourceField],
}));
case "EXPLODE":
return this.applyExplode(
rows,
sourceField,
actualTargetField,
delimiter || ","
);
case "CONCAT":
return rows.map((row) => {
const value1 = row[sourceField] || "";
// CONCAT은 여러 필드를 합칠 수 있지만, 단순화하여 expression 사용
const value2 = expression || "";
return {
...row,
[actualTargetField]: `${value1}${separator || ""}${value2}`,
};
});
case "SPLIT":
return rows.map((row) => {
const value = row[sourceField]?.toString() || "";
const parts = value.split(delimiter || ",");
const index = splitIndex !== undefined ? splitIndex : 0;
return {
...row,
[actualTargetField]: parts[index] || "",
};
});
case "REPLACE":
return rows.map((row) => {
const value = row[sourceField]?.toString() || "";
const replaced = value.replace(
new RegExp(searchValue || "", "g"),
replaceValue || ""
);
return {
...row,
[actualTargetField]: replaced,
};
});
case "CAST":
return rows.map((row) => {
const value = row[sourceField];
let castedValue = value;
switch (castType) {
case "string":
castedValue = value?.toString() || "";
break;
case "number":
castedValue = parseFloat(value) || 0;
break;
case "boolean":
castedValue = Boolean(value);
break;
case "date":
castedValue = new Date(value);
break;
}
return {
...row,
[actualTargetField]: castedValue,
};
});
case "FORMAT":
case "CALCULATE":
case "JSON_EXTRACT":
case "CUSTOM":
// 표현식 기반 변환 (현재는 단순 구현)
logger.warn(`⚠️ ${type} 변환은 아직 완전히 구현되지 않았습니다.`);
return rows.map((row) => ({
...row,
[actualTargetField]: row[sourceField] || "",
}));
default:
logger.warn(`⚠️ 지원하지 않는 변환 타입: ${type}`);
return rows;
}
}
/**
* EXPLODE 변환: 1개 행을 N개 행으로 확장
*/
private static applyExplode(
rows: any[],
sourceField: string,
targetField: string,
delimiter: string
): any[] {
const expandedRows: any[] = [];
for (const row of rows) {
const value = row[sourceField];
if (!value) {
// 값이 없으면 원본 행 유지
expandedRows.push(row);
continue;
}
// 문자열을 구분자로 분리
const values = value
.toString()
.split(delimiter)
.map((v: string) => v.trim());
// 각 값마다 새 행 생성
for (const val of values) {
expandedRows.push({
...row, // 다른 필드들은 복제
[targetField]: val, // 타겟 필드에 분리된 값 저장
});
}
}
logger.info(
` 💥 EXPLODE: ${rows.length}개 행 → ${expandedRows.length}개 행`
);
return expandedRows;
}
}