ERP-node/backend-node/src/services/nodeFlowExecutionService.ts

3389 lines
103 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* 노드 플로우 실행 엔진
*
* 기능:
* - 위상 정렬 (Topological Sort)
* - 레벨별 병렬 실행 (Promise.allSettled)
* - 독립 트랜잭션 처리
* - 연쇄 중단 (부모 실패 시 자식 스킵)
*/
import { query, queryOne, transaction } from "../database/db";
import { logger } from "../utils/logger";
import axios from "axios";
// ===== 타입 정의 =====
export interface FlowNode {
id: string;
type: NodeType;
position?: { x: number; y: number };
data: NodeData;
}
export type NodeType =
| "tableSource"
| "externalDBSource"
| "restAPISource"
| "condition"
| "dataTransform"
| "aggregate"
| "insertAction"
| "updateAction"
| "deleteAction"
| "upsertAction"
| "comment"
| "log";
export interface NodeData {
displayName?: string;
[key: string]: any;
}
export interface FlowEdge {
id: string;
source: string;
target: string;
sourceHandle?: string;
targetHandle?: string;
}
export interface ExecutionContext {
sourceData?: any[]; // 외부에서 주입된 데이터 (선택된 행 또는 폼 데이터)
dataSourceType?: string; // "table-selection" | "form" | "none"
nodeResults: Map<string, NodeResult>;
executionOrder: string[];
buttonContext?: ButtonContext;
// 🆕 현재 실행 중인 소스 노드의 dataSourceType (context-data | table-all)
currentNodeDataSourceType?: string;
}
export interface ButtonContext {
buttonId: string;
screenId?: number;
companyCode?: string;
userId?: string;
formData?: Record<string, any>;
selectedRowsData?: Record<string, any>[];
}
export interface NodeResult {
nodeId: string;
status: "pending" | "success" | "failed" | "skipped";
data?: any;
error?: Error;
startTime: number;
endTime?: number;
}
export interface ExecutionResult {
success: boolean;
message: string;
executionTime: number;
nodes: NodeExecutionSummary[];
summary: {
total: number;
success: number;
failed: number;
skipped: number;
};
}
export interface NodeExecutionSummary {
nodeId: string;
nodeName: string;
nodeType: NodeType;
status: "success" | "failed" | "skipped" | "pending";
duration?: number;
error?: string;
}
// ===== 메인 실행 서비스 =====
export class NodeFlowExecutionService {
/**
* 플로우 실행 메인 함수
*/
static async executeFlow(
flowId: number,
contextData: Record<string, any>
): Promise<ExecutionResult> {
const startTime = Date.now();
try {
logger.info(`🚀 플로우 실행 시작: flowId=${flowId}`);
// 1. 플로우 데이터 조회
const flow = await queryOne<{
flow_id: number;
flow_name: string;
flow_data: any;
}>(
`SELECT flow_id, flow_name, flow_data FROM node_flows WHERE flow_id = $1`,
[flowId]
);
if (!flow) {
throw new Error(`플로우를 찾을 수 없습니다: flowId=${flowId}`);
}
const flowData =
typeof flow.flow_data === "string"
? JSON.parse(flow.flow_data)
: flow.flow_data;
const { nodes, edges } = flowData;
logger.info(`📊 플로우 정보:`, {
flowName: flow.flow_name,
nodeCount: nodes.length,
edgeCount: edges.length,
});
// 2. 실행 컨텍스트 준비
const context: ExecutionContext = {
sourceData: contextData.sourceData || [],
dataSourceType: contextData.dataSourceType || "none",
nodeResults: new Map(),
executionOrder: [],
buttonContext: {
buttonId:
contextData.buttonId || contextData.context?.buttonId || "unknown",
screenId: contextData.screenId || contextData.context?.screenId,
companyCode:
contextData.companyCode || contextData.context?.companyCode,
userId: contextData.userId || contextData.context?.userId,
formData: contextData.formData || contextData.context?.formData,
selectedRowsData:
contextData.selectedRowsData ||
contextData.context?.selectedRowsData,
},
};
logger.info(`📦 실행 컨텍스트:`, {
dataSourceType: context.dataSourceType,
sourceDataCount: context.sourceData?.length || 0,
buttonContext: context.buttonContext,
});
// 3. 위상 정렬
const levels = this.topologicalSort(nodes, edges);
logger.info(`📋 실행 순서 (레벨별):`, levels);
// 4. 🔥 전체 플로우를 하나의 트랜잭션으로 실행
let result: ExecutionResult;
try {
result = await transaction(async (client) => {
// 🔥 사용자 ID 세션 변수 설정 (트리거용)
const userId = context.buttonContext?.userId || "system";
await client.query("SELECT set_config('app.user_id', $1, true)", [
userId,
]);
// 트랜잭션 내에서 레벨별 실행
for (const level of levels) {
await this.executeLevel(level, nodes, edges, context, client);
}
// 5. 결과 생성
const executionTime = Date.now() - startTime;
const executionResult = this.generateExecutionResult(
nodes,
context,
executionTime
);
// 실패한 액션 노드가 있으면 롤백
const failedActionNodes = Array.from(
context.nodeResults.values()
).filter(
(result) =>
result.status === "failed" &&
nodes.find(
(n: FlowNode) =>
n.id === result.nodeId && this.isActionNode(n.type)
)
);
if (failedActionNodes.length > 0) {
logger.warn(
`🔄 액션 노드 실패 감지 (${failedActionNodes.length}개), 트랜잭션 롤백`
);
throw new Error(
`액션 노드 실패: ${failedActionNodes.map((n) => n.nodeId).join(", ")}`
);
}
return executionResult;
});
logger.info(`✅ 플로우 실행 완료:`, result.summary);
return result;
} catch (error) {
logger.error(`❌ 플로우 실행 실패, 모든 변경사항 롤백됨:`, error);
throw error;
}
} catch (error) {
logger.error(`❌ 플로우 실행 실패:`, error);
throw error;
}
}
/**
* 소스 데이터 준비
*/
private static prepareSourceData(contextData: Record<string, any>): any[] {
const { controlDataSource, formData, selectedRowsData } = contextData;
switch (controlDataSource) {
case "form":
return formData ? [formData] : [];
case "table-selection":
return selectedRowsData || [];
case "both":
return [
{ source: "form", data: formData },
{ source: "table", data: selectedRowsData },
];
default:
return formData ? [formData] : [];
}
}
/**
* 위상 정렬 (Topological Sort)
* DAG(Directed Acyclic Graph)를 레벨별로 그룹화
*/
private static topologicalSort(
nodes: FlowNode[],
edges: FlowEdge[]
): string[][] {
const levels: string[][] = [];
const inDegree = new Map<string, number>();
const adjacency = new Map<string, string[]>();
// 초기화
nodes.forEach((node) => {
inDegree.set(node.id, 0);
adjacency.set(node.id, []);
});
// 진입 차수 계산
edges.forEach((edge) => {
inDegree.set(edge.target, (inDegree.get(edge.target) || 0) + 1);
adjacency.get(edge.source)?.push(edge.target);
});
// 레벨별 분류
let currentLevel = nodes
.filter((node) => inDegree.get(node.id) === 0)
.map((node) => node.id);
while (currentLevel.length > 0) {
levels.push([...currentLevel]);
const nextLevel: string[] = [];
currentLevel.forEach((nodeId) => {
const neighbors = adjacency.get(nodeId) || [];
neighbors.forEach((neighbor) => {
const newDegree = (inDegree.get(neighbor) || 1) - 1;
inDegree.set(neighbor, newDegree);
if (newDegree === 0) {
nextLevel.push(neighbor);
}
});
});
currentLevel = nextLevel;
}
return levels;
}
/**
* 레벨 내 노드 병렬 실행
*/
private static async executeLevel(
nodeIds: string[],
nodes: FlowNode[],
edges: FlowEdge[],
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<void> {
logger.info(`⏳ 레벨 실행 시작: ${nodeIds.length}개 노드`);
// Promise.allSettled로 병렬 실행
const results = await Promise.allSettled(
nodeIds.map((nodeId) =>
this.executeNode(nodeId, nodes, edges, context, client)
)
);
// 결과 저장
results.forEach((result, index) => {
const nodeId = nodeIds[index];
if (result.status === "fulfilled") {
context.nodeResults.set(nodeId, result.value);
context.executionOrder.push(nodeId);
} else {
context.nodeResults.set(nodeId, {
nodeId,
status: "failed",
error: result.reason,
startTime: Date.now(),
endTime: Date.now(),
});
}
});
logger.info(`✅ 레벨 실행 완료`);
}
/**
* 개별 노드 실행
*/
private static async executeNode(
nodeId: string,
nodes: FlowNode[],
edges: FlowEdge[],
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<NodeResult> {
const startTime = Date.now();
const node = nodes.find((n) => n.id === nodeId);
if (!node) {
throw new Error(`노드를 찾을 수 없습니다: ${nodeId}`);
}
logger.info(`🔄 노드 실행 시작: ${nodeId} (${node.type})`);
// 1. 부모 노드 상태 확인 (연쇄 중단)
const parents = this.getParentNodes(nodeId, edges);
const parentFailed = parents.some((parentId) => {
const parentResult = context.nodeResults.get(parentId);
return parentResult?.status === "failed";
});
if (parentFailed) {
logger.warn(`⏭️ 노드 스킵 (부모 실패): ${nodeId}`);
return {
nodeId,
status: "skipped",
error: new Error("Parent node failed"),
startTime,
endTime: Date.now(),
};
}
// 2. 입력 데이터 준비
const inputData = this.prepareInputData(nodeId, parents, edges, context);
// 3. 노드 타입별 실행
try {
const result = await this.executeNodeByType(
node,
inputData,
context,
client
);
logger.info(`✅ 노드 실행 성공: ${nodeId}`);
return {
nodeId,
status: "success",
data: result,
startTime,
endTime: Date.now(),
};
} catch (error) {
logger.error(`❌ 노드 실행 실패: ${nodeId}`, error);
return {
nodeId,
status: "failed",
error: error as Error,
startTime,
endTime: Date.now(),
};
}
}
/**
* 부모 노드 목록 조회
*/
private static getParentNodes(nodeId: string, edges: FlowEdge[]): string[] {
return edges
.filter((edge) => edge.target === nodeId)
.map((edge) => edge.source);
}
/**
* 입력 데이터 준비
*/
private static prepareInputData(
nodeId: string,
parents: string[],
edges: FlowEdge[],
context: ExecutionContext
): any {
if (parents.length === 0) {
// 소스 노드: 원본 데이터 사용
return context.sourceData;
} else if (parents.length === 1) {
// 단일 부모: 부모의 결과 데이터 전달
const parentId = parents[0];
const parentResult = context.nodeResults.get(parentId);
let data = parentResult?.data || context.sourceData;
// 🔥 조건 노드에서 온 데이터인 경우 sourceHandle 확인
const edge = edges.find(
(e) => e.source === parentId && e.target === nodeId
);
if (
edge?.sourceHandle &&
data &&
typeof data === "object" &&
"conditionResult" in data
) {
// 조건 노드의 결과 객체
if (edge.sourceHandle === "true") {
logger.info(
`✅ TRUE 브랜치 데이터 사용: ${data.trueData?.length || 0}`
);
return data.trueData || [];
} else if (edge.sourceHandle === "false") {
logger.info(
`✅ FALSE 브랜치 데이터 사용: ${data.falseData?.length || 0}`
);
return data.falseData || [];
} else {
// sourceHandle이 없거나 다른 값이면 allData 사용
return data.allData || data;
}
}
return data;
} else {
// 다중 부모: 모든 부모의 데이터 병합
const allData: any[] = [];
parents.forEach((parentId) => {
const parentResult = context.nodeResults.get(parentId);
let data = parentResult?.data || context.sourceData;
// 🔥 조건 노드에서 온 데이터인 경우 sourceHandle 확인
const edge = edges.find(
(e) => e.source === parentId && e.target === nodeId
);
if (
edge?.sourceHandle &&
data &&
typeof data === "object" &&
"conditionResult" in data
) {
// 조건 노드의 결과 객체
if (edge.sourceHandle === "true") {
data = data.trueData || [];
} else if (edge.sourceHandle === "false") {
data = data.falseData || [];
} else {
data = data.allData || data;
}
}
// 배열이면 펼쳐서 추가
if (Array.isArray(data)) {
allData.push(...data);
} else {
allData.push(data);
}
});
logger.info(
`🔗 다중 부모 병합: ${parents.length}개 부모, 총 ${allData.length}건 데이터`
);
return allData;
}
}
/**
* 노드 타입별 실행 로직
*/
private static async executeNodeByType(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
switch (node.type) {
case "tableSource":
return this.executeTableSource(node, context);
case "externalDBSource":
return this.executeExternalDBSource(node, context);
case "restAPISource":
return this.executeRestAPISource(node, context);
case "dataTransform":
return this.executeDataTransform(node, inputData, context);
case "aggregate":
return this.executeAggregate(node, inputData, context);
case "insertAction":
return this.executeInsertAction(node, inputData, context, client);
case "updateAction":
return this.executeUpdateAction(node, inputData, context, client);
case "deleteAction":
return this.executeDeleteAction(node, inputData, context, client);
case "upsertAction":
return this.executeUpsertAction(node, inputData, context, client);
case "condition":
return this.executeCondition(node, inputData, context);
case "comment":
case "log":
// 로그/코멘트는 실행 없이 통과
logger.info(`📝 ${node.type}: ${node.data.displayName || node.id}`);
return { message: "Logged" };
default:
logger.warn(`⚠️ 지원하지 않는 노드 타입: ${node.type}`);
return { message: "Unsupported node type" };
}
}
/**
* REST API 소스 노드 실행
*/
private static async executeRestAPISource(
node: FlowNode,
context: ExecutionContext
): Promise<any[]> {
const {
url,
method = "GET",
headers = {},
body,
timeout = 30000,
responseMapping,
authentication,
} = node.data;
if (!url) {
throw new Error("REST API URL이 설정되지 않았습니다.");
}
logger.info(`🌐 REST API 호출: ${method} ${url}`);
try {
// 헤더 설정
const requestHeaders: any = { ...headers };
// 인증 헤더 추가
if (authentication) {
if (authentication.type === "bearer" && authentication.token) {
requestHeaders["Authorization"] = `Bearer ${authentication.token}`;
} else if (
authentication.type === "basic" &&
authentication.username &&
authentication.password
) {
const credentials = Buffer.from(
`${authentication.username}:${authentication.password}`
).toString("base64");
requestHeaders["Authorization"] = `Basic ${credentials}`;
} else if (authentication.type === "apikey" && authentication.token) {
const headerName = authentication.apiKeyHeader || "X-API-Key";
requestHeaders[headerName] = authentication.token;
}
}
if (!requestHeaders["Content-Type"]) {
requestHeaders["Content-Type"] = "application/json";
}
// API 호출
const response = await axios({
method: method.toLowerCase(),
url,
headers: requestHeaders,
data: body,
timeout,
});
logger.info(`✅ REST API 응답 수신: ${response.status}`);
let responseData = response.data;
// 🔥 표준 API 응답 형식 자동 감지 { success, message, data }
if (
!responseMapping &&
responseData &&
typeof responseData === "object" &&
"success" in responseData &&
"data" in responseData
) {
logger.info("🔍 표준 API 응답 형식 감지, data 속성 자동 추출");
responseData = responseData.data;
}
// responseMapping이 있으면 해당 경로의 데이터 추출
if (responseMapping && responseData) {
logger.info(`🔍 응답 매핑 적용: ${responseMapping}`);
const path = responseMapping.split(".");
for (const key of path) {
if (
responseData &&
typeof responseData === "object" &&
key in responseData
) {
responseData = responseData[key];
} else {
logger.warn(
`⚠️ 응답 매핑 경로를 찾을 수 없습니다: ${responseMapping}`
);
break;
}
}
}
// 배열이 아니면 배열로 변환
if (!Array.isArray(responseData)) {
logger.info("🔄 단일 객체를 배열로 변환");
responseData = [responseData];
}
logger.info(`📦 REST API 데이터 ${responseData.length}건 반환`);
// 첫 번째 데이터 샘플 상세 로깅
if (responseData.length > 0) {
console.log("🔍 REST API 응답 데이터 샘플 (첫 번째 항목):");
console.log(JSON.stringify(responseData[0], null, 2));
console.log("🔑 사용 가능한 필드명:", Object.keys(responseData[0]));
}
return responseData;
} catch (error: any) {
logger.error(`❌ REST API 호출 실패:`, error.message);
throw new Error(`REST API 호출 실패: ${error.message}`);
}
}
/**
* 외부 DB 소스 노드 실행
*/
private static async executeExternalDBSource(
node: FlowNode,
context: ExecutionContext
): Promise<any[]> {
const { connectionId, tableName, schema, whereConditions, dataSourceType } =
node.data;
// 🆕 노드의 dataSourceType 확인 (기본값: context-data)
const nodeDataSourceType = dataSourceType || "context-data";
// 🆕 ExecutionContext에 현재 소스 노드의 dataSourceType 저장
context.currentNodeDataSourceType = nodeDataSourceType;
logger.info(
`🔌 외부 DB 소스 노드 실행: ${connectionId}.${tableName}, dataSourceType=${nodeDataSourceType}`
);
// 1. context-data 모드: 외부에서 주입된 데이터 사용
if (nodeDataSourceType === "context-data") {
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
logger.info(
`📊 컨텍스트 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}`
);
return context.sourceData;
}
logger.warn(
`⚠️ context-data 모드이지만 전달된 데이터가 없습니다. 빈 배열 반환.`
);
return [];
}
// 2. table-all 모드: 외부 DB 테이블 전체 데이터 조회
if (nodeDataSourceType === "table-all") {
if (!connectionId || !tableName) {
throw new Error(
"외부 DB 연결 정보 또는 테이블명이 설정되지 않았습니다."
);
}
try {
// 연결 풀 서비스 임포트 (동적 임포트로 순환 참조 방지)
const { ExternalDbConnectionPoolService } = await import(
"./externalDbConnectionPoolService"
);
const poolService = ExternalDbConnectionPoolService.getInstance();
// 스키마 접두사 처리
const schemaPrefix = schema ? `${schema}.` : "";
const fullTableName = `${schemaPrefix}${tableName}`;
// WHERE 절 생성
let sql = `SELECT * FROM ${fullTableName}`;
let params: any[] = [];
if (whereConditions && whereConditions.length > 0) {
const whereResult = this.buildWhereClause(whereConditions);
sql += ` ${whereResult.clause}`;
params = whereResult.values;
}
logger.info(`📊 외부 DB 쿼리 실행: ${sql}`);
// 연결 풀을 통해 쿼리 실행
const result = await poolService.executeQuery(
connectionId,
sql,
params
);
logger.info(
`✅ 외부 DB 전체 데이터 조회 완료: ${tableName}, ${result.length}`
);
return result;
} catch (error: any) {
logger.error(`❌ 외부 DB 소스 조회 실패:`, error);
throw new Error(
`외부 DB 조회 실패 (연결 ID: ${connectionId}): ${error.message}`
);
}
}
// 3. 알 수 없는 모드 (기본값으로 처리)
logger.warn(
`⚠️ 알 수 없는 dataSourceType: ${nodeDataSourceType}, context-data로 처리`
);
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
return context.sourceData;
}
return [];
}
/**
* 테이블 소스 노드 실행
*/
private static async executeTableSource(
node: FlowNode,
context: ExecutionContext
): Promise<any[]> {
const { tableName, schema, whereConditions, dataSourceType } = node.data;
// 🆕 노드의 dataSourceType 확인 (기본값: context-data)
const nodeDataSourceType = dataSourceType || "context-data";
// 🆕 ExecutionContext에 현재 소스 노드의 dataSourceType 저장
context.currentNodeDataSourceType = nodeDataSourceType;
logger.info(
`📊 테이블 소스 노드 실행: ${tableName}, dataSourceType=${nodeDataSourceType}`
);
// 1. context-data 모드: 외부에서 주입된 데이터 사용
if (nodeDataSourceType === "context-data") {
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
logger.info(
`📊 컨텍스트 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}`
);
return context.sourceData;
}
logger.warn(
`⚠️ context-data 모드이지만 전달된 데이터가 없습니다. 빈 배열 반환.`
);
return [];
}
// 2. table-all 모드: 테이블 전체 데이터 조회
if (nodeDataSourceType === "table-all") {
if (!tableName) {
logger.warn("⚠️ 테이블 소스 노드에 테이블명이 없습니다.");
return [];
}
const schemaPrefix = schema ? `${schema}.` : "";
const whereResult = whereConditions
? this.buildWhereClause(whereConditions)
: { clause: "", values: [] };
const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereResult.clause}`;
logger.info(`📊 테이블 전체 데이터 조회 SQL: ${sql}`);
const result = await query(sql, whereResult.values);
logger.info(
`📊 테이블 전체 데이터 조회: ${tableName}, ${result.length}`
);
// 디버깅: 조회된 데이터 샘플 출력
if (result.length > 0) {
logger.info(`📊 조회된 데이터 샘플: ${JSON.stringify(result[0])?.substring(0, 300)}`);
}
return result;
}
// 3. 알 수 없는 모드 (기본값으로 처리)
logger.warn(
`⚠️ 알 수 없는 dataSourceType: ${nodeDataSourceType}, context-data로 처리`
);
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
return context.sourceData;
}
return [];
}
/**
* INSERT 액션 노드 실행
*/
private static async executeInsertAction(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalInsert(node, inputData, context, client);
case "external":
return this.executeExternalInsert(node, inputData, context);
case "api":
return this.executeApiInsert(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalInsert(node, inputData, context, client);
}
}
/**
* 내부 DB INSERT 실행
*/
private static async executeInternalInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetTable, fieldMappings } = node.data;
logger.info(`💾 INSERT 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
const executeInsert = async (txClient: any) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
const insertedDataArray: any[] = [];
for (const data of dataArray) {
const fields: string[] = [];
const values: any[] = [];
// 🔥 삽입된 데이터 복사본 생성
const insertedData = { ...data };
console.log("🗺️ 필드 매핑 처리 중...");
fieldMappings.forEach((mapping: any) => {
fields.push(mapping.targetField);
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
console.log(
` ${mapping.sourceField}${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
);
values.push(value);
// 🔥 삽입된 값을 데이터에 반영
insertedData[mapping.targetField] = value;
});
// 🆕 writer와 company_code 자동 추가 (필드 매핑에 없는 경우)
const hasWriterMapping = fieldMappings.some((m: any) => m.targetField === "writer");
const hasCompanyCodeMapping = fieldMappings.some((m: any) => m.targetField === "company_code");
// 컨텍스트에서 사용자 정보 추출
const userId = context.buttonContext?.userId;
const companyCode = context.buttonContext?.companyCode;
// writer 자동 추가 (매핑에 없고, 컨텍스트에 userId가 있는 경우)
if (!hasWriterMapping && userId) {
fields.push("writer");
values.push(userId);
insertedData.writer = userId;
console.log(` 🔧 자동 추가: writer = ${userId}`);
}
// company_code 자동 추가 (매핑에 없고, 컨텍스트에 companyCode가 있는 경우)
if (!hasCompanyCodeMapping && companyCode && companyCode !== "*") {
fields.push("company_code");
values.push(companyCode);
insertedData.company_code = companyCode;
console.log(` 🔧 자동 추가: company_code = ${companyCode}`);
}
const sql = `
INSERT INTO ${targetTable} (${fields.join(", ")})
VALUES (${fields.map((_, i) => `$${i + 1}`).join(", ")})
RETURNING *
`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", values);
const result = await txClient.query(sql, values);
insertedCount++;
// 🔥 RETURNING으로 받은 실제 삽입 데이터 사용 (AUTO_INCREMENT 등 포함)
if (result.rows && result.rows.length > 0) {
insertedDataArray.push(result.rows[0]);
} else {
// RETURNING이 없으면 생성한 데이터 사용
insertedDataArray.push(insertedData);
}
}
logger.info(
`✅ INSERT 완료 (내부 DB): ${targetTable}, ${insertedCount}`
);
// 🔥 삽입된 데이터 반환 (AUTO_INCREMENT ID 등 포함)
return insertedDataArray;
};
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
if (client) {
return executeInsert(client);
} else {
return transaction(executeInsert);
}
}
/**
* 외부 DB INSERT 실행
*/
private static async executeExternalInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB INSERT 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
const insertedDataArray: any[] = [];
// 🔥 Oracle의 경우 autoCommit을 false로 설정하여 트랜잭션 제어
const isOracle = externalDbType.toLowerCase() === "oracle";
for (const data of dataArray) {
const fields: string[] = [];
const values: any[] = [];
const insertedData: any = { ...data };
fieldMappings.forEach((mapping: any) => {
fields.push(mapping.targetField);
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
values.push(value);
// 🔥 삽입된 데이터 객체에 매핑된 값 적용
insertedData[mapping.targetField] = value;
});
// 외부 DB별 SQL 문법 차이 처리
let sql: string;
let params: any[];
if (isOracle) {
// Oracle: :1, :2, ... 형식
const placeholders = fields.map((_, i) => `:${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
// MySQL/MariaDB: ? 형식
const placeholders = fields.map(() => "?").join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else if (externalDbType.toLowerCase() === "mssql") {
// MSSQL: @p1, @p2, ... 형식
const placeholders = fields.map((_, i) => `@p${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else {
// PostgreSQL: $1, $2, ... 형식 (기본)
const placeholders = fields.map((_, i) => `$${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
}
await connector.executeQuery(sql, params);
insertedCount++;
insertedDataArray.push(insertedData);
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
insertedCount
);
logger.info(
`✅ INSERT 완료 (외부 DB): ${externalTargetTable}, ${insertedCount}`
);
// 🔥 삽입된 데이터 반환 (외부 DB는 자동 생성 ID 없으므로 입력 데이터 기반)
return insertedDataArray;
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
// 연결 해제
await connector.disconnect();
}
}
/**
* REST API INSERT 실행 (POST 요청)
*/
private static async executeApiInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API INSERT 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
// Content-Type 기본값 설정
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성 (템플릿 또는 필드 매핑)
let body: any;
if (apiBodyTemplate) {
// 템플릿 변수 치환
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
// 필드 매핑 사용
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
// 전체 데이터 전송
body = data;
}
try {
const response = await axios({
method: apiMethod || "POST",
url: apiEndpoint,
headers,
data: body,
timeout: 30000, // 30초 타임아웃
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API INSERT 완료: ${results.length}`);
return { results };
}
/**
* 템플릿 변수 치환 ({{variable}} 형식)
*/
private static replaceTemplateVariables(template: string, data: any): string {
let result = template;
// {{variable}} 형식의 변수를 찾아서 치환
const matches = template.match(/\{\{([^}]+)\}\}/g);
if (matches) {
matches.forEach((match) => {
const key = match.replace(/\{\{|\}\}/g, "").trim();
const value = this.getNestedValue(data, key);
result = result.replace(match, value !== undefined ? value : "");
});
}
return result;
}
/**
* 중첩된 객체 값 가져오기 (예: "user.name")
*/
private static getNestedValue(obj: any, path: string): any {
return path.split(".").reduce((current, key) => current?.[key], obj);
}
/**
* 외부 DB 커넥터 생성 (공통 로직)
*/
private static async createExternalConnector(
connectionId: number,
dbType: string
): Promise<any> {
// 🔥 연결 풀 서비스를 통한 연결 관리 (연결 풀 고갈 방지)
const { ExternalDbConnectionPoolService } = await import(
"./externalDbConnectionPoolService"
);
const poolService = ExternalDbConnectionPoolService.getInstance();
const pool = await poolService.getPool(connectionId);
// DatabaseConnectorFactory와 호환되도록 래퍼 객체 반환
return {
executeQuery: async (sql: string, params?: any[]) => {
const result = await pool.query(sql, params);
return {
rows: Array.isArray(result) ? result : [result],
rowCount: Array.isArray(result) ? result.length : 1,
affectedRows: Array.isArray(result) ? result.length : 1,
};
},
disconnect: async () => {
// 연결 풀은 자동 관리되므로 즉시 종료하지 않음
logger.debug(`📌 연결 풀 유지 (ID: ${connectionId})`);
},
};
}
/**
* 외부 DB 트랜잭션 커밋 (Oracle 전용)
*/
private static async commitExternalTransaction(
connector: any,
dbType: string,
count: number
): Promise<void> {
if (dbType.toLowerCase() === "oracle" && count > 0) {
await connector.executeQuery("COMMIT");
logger.info(`✅ Oracle COMMIT 실행: ${count}`);
}
}
/**
* 외부 DB 트랜잭션 롤백 (Oracle 전용)
*/
private static async rollbackExternalTransaction(
connector: any,
dbType: string
): Promise<void> {
if (dbType.toLowerCase() === "oracle") {
try {
await connector.executeQuery("ROLLBACK");
logger.info(`⚠️ Oracle ROLLBACK 실행 (오류 발생)`);
} catch (rollbackError) {
logger.error(`❌ Oracle ROLLBACK 실패:`, rollbackError);
}
}
}
/**
* UPDATE 액션 노드 실행
*/
private static async executeUpdateAction(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalUpdate(node, inputData, context, client);
case "external":
return this.executeExternalUpdate(node, inputData, context);
case "api":
return this.executeApiUpdate(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalUpdate(node, inputData, context, client);
}
}
/**
* 내부 DB UPDATE 실행
*/
private static async executeInternalUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetTable, fieldMappings, whereConditions } = node.data;
logger.info(`🔄 UPDATE 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
const executeUpdate = async (txClient: any) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let updatedCount = 0;
const updatedDataArray: any[] = [];
// 🆕 table-all 모드: 각 그룹별로 UPDATE 실행 (집계 결과 반영)
if (context.currentNodeDataSourceType === "table-all") {
console.log("🚀 table-all 모드: 그룹별 업데이트 시작 (총 " + dataArray.length + "개 그룹)");
// 🔥 각 그룹(데이터)별로 UPDATE 실행
for (let i = 0; i < dataArray.length; i++) {
const data = dataArray[i];
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
console.log(`\n📦 그룹 ${i + 1}/${dataArray.length} 처리 중...`);
console.log("🗺️ 필드 매핑 처리 중...");
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
console.log(
` ${mapping.sourceField}${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
);
if (mapping.targetField) {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
values.push(value);
paramIndex++;
}
});
// WHERE 조건 (사용자 정의 조건만 사용, PK 자동 추가 안 함)
const whereResult = this.buildWhereClause(
whereConditions,
data,
paramIndex
);
values.push(...whereResult.values);
const sql = `
UPDATE ${targetTable}
SET ${setClauses.join(", ")}
${whereResult.clause}
`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", values);
const result = await txClient.query(sql, values);
const rowCount = result.rowCount || 0;
updatedCount += rowCount;
console.log(`✅ 그룹 ${i + 1} UPDATE 완료: ${rowCount}`);
}
logger.info(
`✅ UPDATE 완료 (내부 DB, 그룹별 처리): ${targetTable}, 총 ${updatedCount}`
);
// 업데이트된 데이터는 원본 배열 반환 (실제 DB에서 다시 조회하지 않음)
return dataArray;
}
// 🆕 context-data 모드: 개별 업데이트 (PK 자동 추가)
console.log("🎯 context-data 모드: 개별 업데이트 시작");
for (const data of dataArray) {
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// 🔥 업데이트된 데이터 복사본 생성
const updatedData = { ...data };
console.log("🗺️ 필드 매핑 처리 중...");
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
console.log(
` ${mapping.sourceField}${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
);
// targetField가 비어있지 않은 경우만 추가
if (mapping.targetField) {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
values.push(value);
paramIndex++;
// 🔥 업데이트된 값을 데이터에 반영
updatedData[mapping.targetField] = value;
} else {
console.log(
`⚠️ targetField가 비어있어 스킵: ${mapping.sourceField}`
);
}
});
// 🔑 Primary Key 자동 추가 (context-data 모드)
console.log("🔑 context-data 모드: Primary Key 자동 추가");
const enhancedWhereConditions = await this.enhanceWhereConditionsWithPK(
whereConditions,
data,
targetTable
);
const whereResult = this.buildWhereClause(
enhancedWhereConditions,
data,
paramIndex
);
// WHERE 절의 값들을 values 배열에 추가
values.push(...whereResult.values);
const sql = `
UPDATE ${targetTable}
SET ${setClauses.join(", ")}
${whereResult.clause}
`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", values);
const result = await txClient.query(sql, values);
updatedCount += result.rowCount || 0;
// 🔥 업데이트된 데이터 저장
updatedDataArray.push(updatedData);
}
logger.info(
`✅ UPDATE 완료 (내부 DB): ${targetTable}, ${updatedCount}`
);
// 🔥 업데이트된 데이터 반환 (다음 노드에서 사용)
return updatedDataArray;
};
// 🔥 클라이언트가 전달되었으면 사용, 없으면 독립 트랜잭션 생성
if (client) {
return executeUpdate(client);
} else {
return transaction(executeUpdate);
}
}
/**
* 외부 DB UPDATE 실행
*/
private static async executeExternalUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
whereConditions,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB UPDATE 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let updatedCount = 0;
const updatedDataArray: any[] = [];
for (const data of dataArray) {
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
const updatedData: any = { ...data };
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
// DB별 플레이스홀더
if (externalDbType.toLowerCase() === "oracle") {
setClauses.push(`${mapping.targetField} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
setClauses.push(`${mapping.targetField} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
setClauses.push(`${mapping.targetField} = @p${paramIndex}`);
} else {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
}
values.push(value);
paramIndex++;
// 🔥 업데이트된 데이터 객체에 매핑된 값 적용
updatedData[mapping.targetField] = value;
});
// WHERE 조건 생성
const whereClauses: string[] = [];
whereConditions?.forEach((condition: any) => {
const condValue = data[condition.field];
if (condition.operator === "IS NULL") {
whereClauses.push(`${condition.field} IS NULL`);
} else if (condition.operator === "IS NOT NULL") {
whereClauses.push(`${condition.field} IS NOT NULL`);
} else {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(
`${condition.field} ${condition.operator} :${paramIndex}`
);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${condition.field} ${condition.operator} ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(
`${condition.field} ${condition.operator} @p${paramIndex}`
);
} else {
whereClauses.push(
`${condition.field} ${condition.operator} $${paramIndex}`
);
}
values.push(condValue);
paramIndex++;
}
});
const whereClause =
whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : "";
const sql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} ${whereClause}`;
const result = await connector.executeQuery(sql, values);
updatedCount += result.rowCount || result.affectedRows || 0;
updatedDataArray.push(updatedData);
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
updatedCount
);
logger.info(
`✅ UPDATE 완료 (외부 DB): ${externalTargetTable}, ${updatedCount}`
);
// 🔥 업데이트된 데이터 반환
return updatedDataArray;
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API UPDATE 실행 (PUT/PATCH 요청)
*/
private static async executeApiUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API UPDATE 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성
let body: any;
if (apiBodyTemplate) {
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
body = data;
}
try {
const response = await axios({
method: apiMethod || "PUT",
url: apiEndpoint,
headers,
data: body,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API UPDATE 완료: ${results.length}`);
return { results };
}
/**
* DELETE 액션 노드 실행
*/
private static async executeDeleteAction(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalDelete(node, inputData, context, client);
case "external":
return this.executeExternalDelete(node, inputData, context);
case "api":
return this.executeApiDelete(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalDelete(node, inputData, context, client);
}
}
/**
* 내부 DB DELETE 실행
*/
private static async executeInternalDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetTable, whereConditions } = node.data;
logger.info(`🗑️ DELETE 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
const executeDelete = async (txClient: any) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let deletedCount = 0;
const deletedDataArray: any[] = [];
// 🆕 table-all 모드: 단일 SQL로 일괄 삭제
if (context.currentNodeDataSourceType === "table-all") {
console.log("🚀 table-all 모드: 단일 SQL로 일괄 삭제 시작");
// 첫 번째 데이터를 참조하여 WHERE 절 생성
const firstData = dataArray[0];
// WHERE 조건 (사용자 정의 조건만 사용, PK 자동 추가 안 함)
const whereResult = this.buildWhereClause(whereConditions, firstData, 1);
const sql = `DELETE FROM ${targetTable} ${whereResult.clause} RETURNING *`;
console.log("📝 실행할 SQL (일괄 처리):", sql);
console.log("📊 바인딩 값:", whereResult.values);
const result = await txClient.query(sql, whereResult.values);
deletedCount = result.rowCount || 0;
// 🔥 RETURNING으로 받은 삭제된 데이터 저장
if (result.rows && result.rows.length > 0) {
deletedDataArray.push(...result.rows);
}
logger.info(
`✅ DELETE 완료 (내부 DB, 일괄 처리): ${targetTable}, ${deletedCount}`
);
return deletedDataArray;
}
// 🆕 context-data 모드: 개별 삭제 (PK 자동 추가)
console.log("🎯 context-data 모드: 개별 삭제 시작");
for (const data of dataArray) {
console.log("🔍 WHERE 조건 처리 중...");
// 🔑 Primary Key 자동 추가 (context-data 모드)
console.log("🔑 context-data 모드: Primary Key 자동 추가");
const enhancedWhereConditions = await this.enhanceWhereConditionsWithPK(
whereConditions,
data,
targetTable
);
const whereResult = this.buildWhereClause(enhancedWhereConditions, data, 1);
const sql = `DELETE FROM ${targetTable} ${whereResult.clause} RETURNING *`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", whereResult.values);
const result = await txClient.query(sql, whereResult.values);
deletedCount += result.rowCount || 0;
// 🔥 RETURNING으로 받은 삭제된 데이터 저장
if (result.rows && result.rows.length > 0) {
deletedDataArray.push(...result.rows);
}
}
logger.info(
`✅ DELETE 완료 (내부 DB): ${targetTable}, ${deletedCount}`
);
// 🔥 삭제된 데이터 반환 (로그 기록 등에 사용)
return deletedDataArray;
};
// 🔥 클라이언트가 전달되었으면 사용, 없으면 독립 트랜잭션 생성
if (client) {
return executeDelete(client);
} else {
return transaction(executeDelete);
}
}
/**
* 외부 DB DELETE 실행
*/
private static async executeExternalDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
whereConditions,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB DELETE 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let deletedCount = 0;
const deletedDataArray: any[] = [];
for (const data of dataArray) {
const whereClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// WHERE 조건 생성
whereConditions?.forEach((condition: any) => {
const condValue = data[condition.field];
if (condition.operator === "IS NULL") {
whereClauses.push(`${condition.field} IS NULL`);
} else if (condition.operator === "IS NOT NULL") {
whereClauses.push(`${condition.field} IS NOT NULL`);
} else {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(
`${condition.field} ${condition.operator} :${paramIndex}`
);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${condition.field} ${condition.operator} ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(
`${condition.field} ${condition.operator} @p${paramIndex}`
);
} else {
whereClauses.push(
`${condition.field} ${condition.operator} $${paramIndex}`
);
}
values.push(condValue);
paramIndex++;
}
});
const whereClause =
whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : "";
if (!whereClause) {
throw new Error(
"DELETE 작업에 WHERE 조건이 필요합니다. (전체 삭제 방지)"
);
}
// 🔥 삭제 전에 데이터 조회 (로그 기록 용도)
const selectSql = `SELECT * FROM ${externalTargetTable} ${whereClause}`;
const selectResult = await connector.executeQuery(selectSql, values);
if (selectResult && selectResult.length > 0) {
deletedDataArray.push(...selectResult);
}
// 실제 삭제 수행
const deleteSql = `DELETE FROM ${externalTargetTable} ${whereClause}`;
const result = await connector.executeQuery(deleteSql, values);
deletedCount += result.rowCount || result.affectedRows || 0;
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
deletedCount
);
logger.info(
`✅ DELETE 완료 (외부 DB): ${externalTargetTable}, ${deletedCount}`
);
// 🔥 삭제된 데이터 반환
return deletedDataArray;
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API DELETE 실행 (DELETE 요청)
*/
private static async executeApiDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { apiEndpoint, apiAuthType, apiAuthConfig, apiHeaders } = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API DELETE 시작: ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
// DELETE는 일반적으로 URL 파라미터 또는 경로에 ID 포함
// 템플릿 변수 치환 지원 (예: /api/users/{{id}})
const url = this.replaceTemplateVariables(apiEndpoint, data);
try {
const response = await axios({
method: "DELETE",
url,
headers,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API DELETE 완료: ${results.length}`);
return { results };
}
/**
* UPSERT 액션 노드 실행
*/
private static async executeUpsertAction(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalUpsert(node, inputData, context, client);
case "external":
return this.executeExternalUpsert(node, inputData, context);
case "api":
return this.executeApiUpsert(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalUpsert(node, inputData, context, client);
}
}
/**
* 내부 DB UPSERT 실행 (로직 기반)
* DB 제약 조건 없이 SELECT → UPDATE or INSERT 방식으로 구현
*/
private static async executeInternalUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetTable, fieldMappings, conflictKeys } = node.data;
if (!targetTable || !fieldMappings || fieldMappings.length === 0) {
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
}
if (!conflictKeys || conflictKeys.length === 0) {
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
}
logger.info(`🔀 UPSERT 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
console.log("🔑 충돌 키:", conflictKeys);
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
const executeUpsert = async (txClient: any) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
let updatedCount = 0;
for (const data of dataArray) {
// 1. 충돌 키 값 추출
const conflictKeyValues: Record<string, any> = {};
conflictKeys.forEach((key: string) => {
const mapping = fieldMappings.find((m: any) => m.targetField === key);
if (mapping) {
conflictKeyValues[key] =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
}
});
// 2. 존재 여부 확인 (SELECT)
const whereConditions = conflictKeys
.map((key: string, index: number) => `${key} = $${index + 1}`)
.join(" AND ");
const whereValues = conflictKeys.map(
(key: string) => conflictKeyValues[key]
);
console.log("🔍 존재 여부 확인 - WHERE 조건:", whereConditions);
console.log("🔍 존재 여부 확인 - 바인딩 값:", whereValues);
const checkSql = `SELECT 1 FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`;
const existingRow = await txClient.query(checkSql, whereValues);
if (existingRow.rows.length > 0) {
// 3-A. 존재하면 UPDATE
const setClauses: string[] = [];
const updateValues: any[] = [];
let paramIndex = 1;
fieldMappings.forEach((mapping: any) => {
// 충돌 키가 아닌 필드만 UPDATE
if (!conflictKeys.includes(mapping.targetField)) {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
updateValues.push(value);
paramIndex++;
}
});
// WHERE 조건 생성 (파라미터 인덱스 이어서)
const updateWhereConditions = conflictKeys
.map(
(key: string, index: number) => `${key} = $${paramIndex + index}`
)
.join(" AND ");
// WHERE 조건 값 추가
whereValues.forEach((val: any) => {
updateValues.push(val);
});
const updateSql = `
UPDATE ${targetTable}
SET ${setClauses.join(", ")}
WHERE ${updateWhereConditions}
`;
logger.info(`🔄 UPDATE 실행:`, {
table: targetTable,
conflictKeys,
conflictKeyValues,
sql: updateSql,
values: updateValues,
});
await txClient.query(updateSql, updateValues);
updatedCount++;
} else {
// 3-B. 없으면 INSERT
const columns: string[] = [];
const values: any[] = [];
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
columns.push(mapping.targetField);
values.push(value);
});
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
const insertSql = `
INSERT INTO ${targetTable} (${columns.join(", ")})
VALUES (${placeholders})
`;
logger.info(` INSERT 실행:`, {
table: targetTable,
conflictKeys,
conflictKeyValues,
});
await txClient.query(insertSql, values);
insertedCount++;
}
}
logger.info(
`✅ UPSERT 완료 (내부 DB): ${targetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}`
);
return {
insertedCount,
updatedCount,
totalCount: insertedCount + updatedCount,
};
};
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
if (client) {
return executeUpsert(client);
} else {
return transaction(executeUpsert);
}
}
/**
* 외부 DB UPSERT 실행 (로직 기반)
*/
private static async executeExternalUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
conflictKeys,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
if (!fieldMappings || fieldMappings.length === 0) {
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
}
if (!conflictKeys || conflictKeys.length === 0) {
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
}
logger.info(
`🔌 외부 DB UPSERT 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
let updatedCount = 0;
for (const data of dataArray) {
// 1. 충돌 키 값 추출
const conflictKeyValues: Record<string, any> = {};
conflictKeys.forEach((key: string) => {
const mapping = fieldMappings.find((m: any) => m.targetField === key);
if (mapping) {
conflictKeyValues[key] =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
}
});
// 2. 존재 여부 확인 (SELECT)
const whereClauses: string[] = [];
const whereValues: any[] = [];
let paramIndex = 1;
conflictKeys.forEach((key: string) => {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(`${key} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${key} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(`${key} = @p${paramIndex}`);
} else {
whereClauses.push(`${key} = $${paramIndex}`);
}
whereValues.push(conflictKeyValues[key]);
paramIndex++;
});
const checkSql = `SELECT * FROM ${externalTargetTable} WHERE ${whereClauses.join(" AND ")} LIMIT 1`;
const existingRow = await connector.executeQuery(checkSql, whereValues);
const hasExistingRow =
existingRow.rows?.length > 0 || existingRow.length > 0;
if (hasExistingRow) {
// 3-A. 존재하면 UPDATE
const setClauses: string[] = [];
const updateValues: any[] = [];
paramIndex = 1;
fieldMappings.forEach((mapping: any) => {
if (!conflictKeys.includes(mapping.targetField)) {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
if (externalDbType.toLowerCase() === "oracle") {
setClauses.push(`${mapping.targetField} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
setClauses.push(`${mapping.targetField} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
setClauses.push(`${mapping.targetField} = @p${paramIndex}`);
} else {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
}
updateValues.push(value);
paramIndex++;
}
});
// WHERE 조건 생성
const updateWhereClauses: string[] = [];
conflictKeys.forEach((key: string) => {
if (externalDbType.toLowerCase() === "oracle") {
updateWhereClauses.push(`${key} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
updateWhereClauses.push(`${key} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
updateWhereClauses.push(`${key} = @p${paramIndex}`);
} else {
updateWhereClauses.push(`${key} = $${paramIndex}`);
}
updateValues.push(conflictKeyValues[key]);
paramIndex++;
});
const updateSql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} WHERE ${updateWhereClauses.join(" AND ")}`;
await connector.executeQuery(updateSql, updateValues);
updatedCount++;
} else {
// 3-B. 없으면 INSERT
const columns: string[] = [];
const values: any[] = [];
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
columns.push(mapping.targetField);
values.push(value);
});
let insertSql: string;
if (externalDbType.toLowerCase() === "oracle") {
const placeholders = columns.map((_, i) => `:${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
const placeholders = columns.map(() => "?").join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else if (externalDbType.toLowerCase() === "mssql") {
const placeholders = columns.map((_, i) => `@p${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else {
const placeholders = columns.map((_, i) => `$${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
}
await connector.executeQuery(insertSql, values);
insertedCount++;
}
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
insertedCount + updatedCount
);
logger.info(
`✅ UPSERT 완료 (외부 DB): ${externalTargetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}`
);
return {
insertedCount,
updatedCount,
totalCount: insertedCount + updatedCount,
};
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API UPSERT 실행 (POST/PUT 요청)
* API 응답에 따라 INSERT/UPDATE 판단
*/
private static async executeApiUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API UPSERT 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성
let body: any;
if (apiBodyTemplate) {
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
body = data;
}
try {
// UPSERT는 일반적으로 PUT 메서드 사용 (멱등성)
const response = await axios({
method: apiMethod || "PUT",
url: apiEndpoint,
headers,
data: body,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API UPSERT 완료: ${results.length}`);
return { results };
}
/**
* 조건 노드 실행
*/
private static async executeCondition(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { conditions, logic } = node.data;
logger.info(
`🔍 조건 노드 실행 - inputData 타입: ${typeof inputData}, 배열 여부: ${Array.isArray(inputData)}, 길이: ${Array.isArray(inputData) ? inputData.length : "N/A"}`
);
logger.info(`🔍 조건 개수: ${conditions?.length || 0}, 로직: ${logic}`);
if (inputData) {
console.log(
"📥 조건 노드 입력 데이터:",
JSON.stringify(inputData, null, 2).substring(0, 500)
);
} else {
console.log("⚠️ 조건 노드 입력 데이터가 없습니다!");
}
// 조건이 없으면 모든 데이터 통과
if (!conditions || conditions.length === 0) {
logger.info("⚠️ 조건이 설정되지 않음 - 모든 데이터 통과");
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
return {
conditionResult: true,
trueData: dataArray,
falseData: [],
allData: dataArray,
};
}
// inputData가 배열인 경우 각 항목을 필터링
if (Array.isArray(inputData)) {
const trueData: any[] = [];
const falseData: any[] = [];
inputData.forEach((item: any) => {
const results = conditions.map((condition: any) => {
const fieldValue = item[condition.field];
let compareValue = condition.value;
if (condition.valueType === "field") {
compareValue = item[condition.value];
logger.info(
`🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})`
);
} else {
logger.info(
`📊 고정값 비교: ${condition.field} (${fieldValue}) vs ${compareValue}`
);
}
return this.evaluateCondition(
fieldValue,
condition.operator,
compareValue
);
});
const result =
logic === "OR"
? results.some((r: boolean) => r)
: results.every((r: boolean) => r);
if (result) {
trueData.push(item);
} else {
falseData.push(item);
}
});
logger.info(
`🔍 조건 필터링 결과: TRUE ${trueData.length}건 / FALSE ${falseData.length}건 (${logic} 로직)`
);
return {
conditionResult: trueData.length > 0,
trueData,
falseData,
allData: inputData,
};
}
// 단일 객체인 경우
const results = conditions.map((condition: any) => {
const fieldValue = inputData[condition.field];
let compareValue = condition.value;
if (condition.valueType === "field") {
compareValue = inputData[condition.value];
logger.info(
`🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})`
);
} else {
logger.info(
`📊 고정값 비교: ${condition.field} (${fieldValue}) vs ${compareValue}`
);
}
return this.evaluateCondition(
fieldValue,
condition.operator,
compareValue
);
});
const result =
logic === "OR"
? results.some((r: boolean) => r)
: results.every((r: boolean) => r);
logger.info(`🔍 조건 평가 결과: ${result} (${logic} 로직)`);
// ⚠️ 조건 노드는 TRUE/FALSE 브랜치를 위한 특별한 처리 필요
// 조건 결과를 저장하고, 원본 데이터는 항상 반환
// 다음 노드에서 sourceHandle을 기반으로 필터링됨
return {
conditionResult: result,
trueData: result ? [inputData] : [],
falseData: result ? [] : [inputData],
allData: [inputData], // 일단 모든 데이터 전달
};
}
/**
* WHERE 절 생성
*/
/**
* 테이블의 Primary Key 컬럼 조회 (내부 DB - PostgreSQL)
*/
private static async getPrimaryKeyColumns(
tableName: string,
schema: string = "public"
): Promise<string[]> {
const sql = `
SELECT a.attname AS column_name
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass
AND i.indisprimary
ORDER BY array_position(i.indkey, a.attnum);
`;
const fullTableName = schema ? `${schema}.${tableName}` : tableName;
try {
const result = await query(sql, [fullTableName]);
const pkColumns = result.map((row: any) => row.column_name);
if (pkColumns.length > 0) {
console.log(`🔑 테이블 ${tableName}의 Primary Key: ${pkColumns.join(", ")}`);
} else {
console.log(`⚠️ 테이블 ${tableName}에 Primary Key가 없습니다`);
}
return pkColumns;
} catch (error) {
console.error(`❌ Primary Key 조회 실패 (${tableName}):`, error);
return [];
}
}
/**
* WHERE 조건에 Primary Key 자동 추가 (컨텍스트 데이터 사용 시)
*
* 테이블의 실제 Primary Key를 자동으로 감지하여 WHERE 조건에 추가
*/
private static async enhanceWhereConditionsWithPK(
whereConditions: any[],
data: any,
tableName: string,
schema: string = "public"
): Promise<any[]> {
if (!data) {
console.log("⚠️ 입력 데이터가 없어 WHERE 조건 자동 추가 불가");
return whereConditions || [];
}
// 🔑 테이블의 실제 Primary Key 컬럼 조회
const pkColumns = await this.getPrimaryKeyColumns(tableName, schema);
if (pkColumns.length === 0) {
console.log(`⚠️ 테이블 ${tableName}에 Primary Key가 없어 자동 추가 불가`);
return whereConditions || [];
}
// 🔍 데이터에 모든 PK 컬럼이 있는지 확인
const missingPKColumns = pkColumns.filter(col =>
data[col] === undefined || data[col] === null
);
if (missingPKColumns.length > 0) {
console.log(
`⚠️ 입력 데이터에 Primary Key 컬럼이 없어 자동 추가 불가: ${missingPKColumns.join(", ")}`
);
return whereConditions || [];
}
// 🔍 이미 WHERE 조건에 모든 PK가 포함되어 있는지 확인
const existingFields = new Set(
(whereConditions || []).map((cond: any) => cond.field)
);
const allPKsExist = pkColumns.every(col =>
existingFields.has(col) || existingFields.has(`${tableName}.${col}`)
);
if (allPKsExist) {
console.log("✅ WHERE 조건에 이미 모든 Primary Key 포함, 추가하지 않음");
return whereConditions || [];
}
// 🔥 Primary Key 조건들을 맨 앞에 추가
const pkConditions = pkColumns.map(col => ({
field: col,
operator: 'EQUALS',
value: data[col]
}));
const enhanced = [...pkConditions, ...(whereConditions || [])];
const pkValues = pkColumns.map(col => `${col} = ${data[col]}`).join(", ");
console.log(`🔑 WHERE 조건에 Primary Key 자동 추가: ${pkValues}`);
return enhanced;
}
private static buildWhereClause(
conditions: any[],
data?: any,
startIndex: number = 1
): { clause: string; values: any[] } {
if (!conditions || conditions.length === 0) {
return { clause: "", values: [] };
}
const values: any[] = [];
const clauses = conditions.map((condition, index) => {
const value = data ? data[condition.field] : condition.value;
values.push(value);
// 연산자를 SQL 문법으로 변환
let sqlOperator = condition.operator;
switch (condition.operator.toUpperCase()) {
case "EQUALS":
sqlOperator = "=";
break;
case "NOT_EQUALS":
case "NOTEQUALS":
sqlOperator = "!=";
break;
case "GREATER_THAN":
case "GREATERTHAN":
sqlOperator = ">";
break;
case "LESS_THAN":
case "LESSTHAN":
sqlOperator = "<";
break;
case "GREATER_THAN_OR_EQUAL":
case "GREATERTHANOREQUAL":
sqlOperator = ">=";
break;
case "LESS_THAN_OR_EQUAL":
case "LESSTHANOREQUAL":
sqlOperator = "<=";
break;
case "LIKE":
sqlOperator = "LIKE";
break;
case "NOT_LIKE":
case "NOTLIKE":
sqlOperator = "NOT LIKE";
break;
case "IN":
sqlOperator = "IN";
break;
case "NOT_IN":
case "NOTIN":
sqlOperator = "NOT IN";
break;
case "IS_NULL":
case "ISNULL":
return `${condition.field} IS NULL`;
case "IS_NOT_NULL":
case "ISNOTNULL":
return `${condition.field} IS NOT NULL`;
default:
// 이미 SQL 문법인 경우 (=, !=, >, < 등)
sqlOperator = condition.operator;
}
return `${condition.field} ${sqlOperator} $${startIndex + index}`;
});
return { clause: `WHERE ${clauses.join(" AND ")}`, values };
}
/**
* 조건 평가
*/
private static evaluateCondition(
fieldValue: any,
operator: string,
expectedValue: any
): boolean {
// NULL 체크
if (operator === "IS_NULL" || operator === "isNull") {
return (
fieldValue === null || fieldValue === undefined || fieldValue === ""
);
}
if (operator === "IS_NOT_NULL" || operator === "isNotNull") {
return (
fieldValue !== null && fieldValue !== undefined && fieldValue !== ""
);
}
// 비교 연산자: 타입 변환
const normalizedOperator = operator.toUpperCase();
switch (normalizedOperator) {
case "EQUALS":
case "=":
return fieldValue == expectedValue; // 느슨한 비교
case "NOT_EQUALS":
case "NOTEQUALS":
case "!=":
return fieldValue != expectedValue;
case "GREATER_THAN":
case "GREATERTHAN":
case ">":
return Number(fieldValue) > Number(expectedValue);
case "LESS_THAN":
case "LESSTHAN":
case "<":
return Number(fieldValue) < Number(expectedValue);
case "GREATER_THAN_OR_EQUAL":
case "GREATERTHANOREQUAL":
case ">=":
return Number(fieldValue) >= Number(expectedValue);
case "LESS_THAN_OR_EQUAL":
case "LESSTHANOREQUAL":
case "<=":
return Number(fieldValue) <= Number(expectedValue);
case "LIKE":
case "CONTAINS":
return String(fieldValue)
.toLowerCase()
.includes(String(expectedValue).toLowerCase());
case "NOT_LIKE":
case "NOTLIKE":
return !String(fieldValue)
.toLowerCase()
.includes(String(expectedValue).toLowerCase());
case "IN":
if (Array.isArray(expectedValue)) {
return expectedValue.includes(fieldValue);
}
// 쉼표로 구분된 문자열
const inValues = String(expectedValue)
.split(",")
.map((v) => v.trim());
return inValues.includes(String(fieldValue));
case "NOT_IN":
case "NOTIN":
if (Array.isArray(expectedValue)) {
return !expectedValue.includes(fieldValue);
}
const notInValues = String(expectedValue)
.split(",")
.map((v) => v.trim());
return !notInValues.includes(String(fieldValue));
default:
logger.warn(`⚠️ 지원되지 않는 연산자: ${operator}`);
return false;
}
}
/**
* 실행 결과 생성
*/
private static generateExecutionResult(
nodes: FlowNode[],
context: ExecutionContext,
executionTime: number
): ExecutionResult {
const nodeSummaries: NodeExecutionSummary[] = nodes.map((node) => {
const result = context.nodeResults.get(node.id);
return {
nodeId: node.id,
nodeName: node.data.displayName || node.id,
nodeType: node.type,
status: result?.status || "pending",
duration: result?.endTime
? result.endTime - result.startTime
: undefined,
error: result?.error?.message,
};
});
const summary = {
total: nodes.length,
success: nodeSummaries.filter((n) => n.status === "success").length,
failed: nodeSummaries.filter((n) => n.status === "failed").length,
skipped: nodeSummaries.filter((n) => n.status === "skipped").length,
};
const success = summary.failed === 0;
// 실패한 노드 상세 로깅
if (!success) {
const failedNodes = nodeSummaries.filter((n) => n.status === "failed");
logger.error(
`❌ 실패한 노드들:`,
failedNodes.map((n) => ({
nodeId: n.nodeId,
nodeName: n.nodeName,
nodeType: n.nodeType,
error: n.error,
}))
);
}
return {
success,
message: success
? `플로우 실행 성공 (${summary.success}/${summary.total})`
: `플로우 실행 부분 실패 (성공: ${summary.success}, 실패: ${summary.failed})`,
executionTime,
nodes: nodeSummaries,
summary,
};
}
/**
* 데이터 변환 노드 실행
*/
private static async executeDataTransform(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any[]> {
const { transformations } = node.data;
if (
!transformations ||
!Array.isArray(transformations) ||
transformations.length === 0
) {
logger.warn(`⚠️ 데이터 변환 노드에 변환 규칙이 없습니다: ${node.id}`);
return Array.isArray(inputData) ? inputData : [inputData];
}
// inputData를 배열로 정규화
const rows = Array.isArray(inputData) ? inputData : [inputData];
logger.info(
`🔄 데이터 변환 시작: ${rows.length}개 행, ${transformations.length}개 변환`
);
// 각 변환 규칙을 순차적으로 적용
let transformedRows = rows;
for (const transform of transformations) {
const transformType = transform.type;
logger.info(` 🔹 변환 적용: ${transformType}`);
transformedRows = this.applyTransformation(transformedRows, transform);
}
logger.info(`✅ 데이터 변환 완료: ${transformedRows.length}개 행`);
return transformedRows;
}
/**
* 단일 변환 규칙 적용
*/
private static applyTransformation(rows: any[], transform: any): any[] {
const {
type,
sourceField,
targetField,
delimiter,
separator,
searchValue,
replaceValue,
splitIndex,
castType,
expression,
} = transform;
// 타겟 필드 결정 (비어있으면 소스 필드 사용 = in-place)
const actualTargetField = targetField || sourceField;
switch (type) {
case "UPPERCASE":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().toUpperCase() || row[sourceField],
}));
case "LOWERCASE":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().toLowerCase() || row[sourceField],
}));
case "TRIM":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().trim() || row[sourceField],
}));
case "EXPLODE":
return this.applyExplode(
rows,
sourceField,
actualTargetField,
delimiter || ","
);
case "CONCAT":
return rows.map((row) => {
const value1 = row[sourceField] || "";
// CONCAT은 여러 필드를 합칠 수 있지만, 단순화하여 expression 사용
const value2 = expression || "";
return {
...row,
[actualTargetField]: `${value1}${separator || ""}${value2}`,
};
});
case "SPLIT":
return rows.map((row) => {
const value = row[sourceField]?.toString() || "";
const parts = value.split(delimiter || ",");
const index = splitIndex !== undefined ? splitIndex : 0;
return {
...row,
[actualTargetField]: parts[index] || "",
};
});
case "REPLACE":
return rows.map((row) => {
const value = row[sourceField]?.toString() || "";
const replaced = value.replace(
new RegExp(searchValue || "", "g"),
replaceValue || ""
);
return {
...row,
[actualTargetField]: replaced,
};
});
case "CAST":
return rows.map((row) => {
const value = row[sourceField];
let castedValue = value;
switch (castType) {
case "string":
castedValue = value?.toString() || "";
break;
case "number":
castedValue = parseFloat(value) || 0;
break;
case "boolean":
castedValue = Boolean(value);
break;
case "date":
castedValue = new Date(value);
break;
}
return {
...row,
[actualTargetField]: castedValue,
};
});
case "FORMAT":
case "CALCULATE":
case "JSON_EXTRACT":
case "CUSTOM":
// 표현식 기반 변환 (현재는 단순 구현)
logger.warn(`⚠️ ${type} 변환은 아직 완전히 구현되지 않았습니다.`);
return rows.map((row) => ({
...row,
[actualTargetField]: row[sourceField] || "",
}));
default:
logger.warn(`⚠️ 지원하지 않는 변환 타입: ${type}`);
return rows;
}
}
/**
* EXPLODE 변환: 1개 행을 N개 행으로 확장
*/
private static applyExplode(
rows: any[],
sourceField: string,
targetField: string,
delimiter: string
): any[] {
const expandedRows: any[] = [];
for (const row of rows) {
const value = row[sourceField];
if (!value) {
// 값이 없으면 원본 행 유지
expandedRows.push(row);
continue;
}
// 문자열을 구분자로 분리
const values = value
.toString()
.split(delimiter)
.map((v: string) => v.trim());
// 각 값마다 새 행 생성
for (const val of values) {
expandedRows.push({
...row, // 다른 필드들은 복제
[targetField]: val, // 타겟 필드에 분리된 값 저장
});
}
}
logger.info(
` 💥 EXPLODE: ${rows.length}개 행 → ${expandedRows.length}개 행`
);
return expandedRows;
}
/**
* 🔥 액션 노드 여부 확인
*/
private static isActionNode(nodeType: NodeType): boolean {
return [
"insertAction",
"updateAction",
"deleteAction",
"upsertAction",
].includes(nodeType);
}
/**
* 집계 노드 실행 (SUM, COUNT, AVG, MIN, MAX 등)
*/
private static async executeAggregate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any[]> {
const { groupByFields = [], aggregations = [], havingConditions = [] } = node.data;
logger.info(`📊 집계 노드 실행: ${node.data.displayName || node.id}`);
// 입력 데이터가 없으면 빈 배열 반환
if (!inputData || !Array.isArray(inputData) || inputData.length === 0) {
logger.warn("⚠️ 집계할 입력 데이터가 없습니다.");
logger.warn(`⚠️ inputData 타입: ${typeof inputData}, 값: ${JSON.stringify(inputData)?.substring(0, 200)}`);
return [];
}
logger.info(`📥 입력 데이터: ${inputData.length}`);
logger.info(`📥 입력 데이터 샘플: ${JSON.stringify(inputData[0])?.substring(0, 300)}`);
logger.info(`📊 그룹 기준: ${groupByFields.length > 0 ? groupByFields.map((f: any) => f.field).join(", ") : "전체"}`);
logger.info(`📊 집계 연산: ${aggregations.length}`);
// 그룹화 수행
const groups = new Map<string, any[]>();
for (const row of inputData) {
// 그룹 키 생성
const groupKey = groupByFields.length > 0
? groupByFields.map((f: any) => String(row[f.field] ?? "")).join("|||")
: "__ALL__";
if (!groups.has(groupKey)) {
groups.set(groupKey, []);
}
groups.get(groupKey)!.push(row);
}
logger.info(`📊 그룹 수: ${groups.size}`);
// 디버깅: 각 그룹의 데이터 출력
for (const [groupKey, groupRows] of groups) {
logger.info(`📊 그룹 [${groupKey}]: ${groupRows.length}건, inbound_qty 합계: ${groupRows.reduce((sum, row) => sum + parseFloat(row.inbound_qty || 0), 0)}`);
}
// 각 그룹에 대해 집계 수행
const results: any[] = [];
for (const [groupKey, groupRows] of groups) {
const resultRow: any = {};
// 그룹 기준 필드값 추가
if (groupByFields.length > 0) {
const keyValues = groupKey.split("|||");
groupByFields.forEach((field: any, idx: number) => {
resultRow[field.field] = keyValues[idx];
});
}
// 각 집계 연산 수행
for (const agg of aggregations) {
const { sourceField, function: aggFunc, outputField } = agg;
if (!outputField) continue;
let aggregatedValue: any;
switch (aggFunc) {
case "SUM":
aggregatedValue = groupRows.reduce((sum: number, row: any) => {
const val = parseFloat(row[sourceField]);
return sum + (isNaN(val) ? 0 : val);
}, 0);
break;
case "COUNT":
aggregatedValue = groupRows.length;
break;
case "AVG":
const sum = groupRows.reduce((acc: number, row: any) => {
const val = parseFloat(row[sourceField]);
return acc + (isNaN(val) ? 0 : val);
}, 0);
aggregatedValue = groupRows.length > 0 ? sum / groupRows.length : 0;
break;
case "MIN":
aggregatedValue = groupRows.reduce((min: number | null, row: any) => {
const val = parseFloat(row[sourceField]);
if (isNaN(val)) return min;
return min === null ? val : Math.min(min, val);
}, null);
break;
case "MAX":
aggregatedValue = groupRows.reduce((max: number | null, row: any) => {
const val = parseFloat(row[sourceField]);
if (isNaN(val)) return max;
return max === null ? val : Math.max(max, val);
}, null);
break;
case "FIRST":
aggregatedValue = groupRows.length > 0 ? groupRows[0][sourceField] : null;
break;
case "LAST":
aggregatedValue = groupRows.length > 0 ? groupRows[groupRows.length - 1][sourceField] : null;
break;
default:
logger.warn(`⚠️ 지원하지 않는 집계 함수: ${aggFunc}`);
aggregatedValue = null;
}
resultRow[outputField] = aggregatedValue;
logger.info(` ${aggFunc}(${sourceField}) → ${outputField}: ${aggregatedValue}`);
}
results.push(resultRow);
}
// HAVING 조건 적용 (집계 후 필터링)
let filteredResults = results;
if (havingConditions && havingConditions.length > 0) {
filteredResults = results.filter((row) => {
return havingConditions.every((condition: any) => {
const fieldValue = row[condition.field];
const compareValue = parseFloat(condition.value);
switch (condition.operator) {
case "=":
return fieldValue === compareValue;
case "!=":
return fieldValue !== compareValue;
case ">":
return fieldValue > compareValue;
case ">=":
return fieldValue >= compareValue;
case "<":
return fieldValue < compareValue;
case "<=":
return fieldValue <= compareValue;
default:
return true;
}
});
});
logger.info(`📊 HAVING 필터링: ${results.length}건 → ${filteredResults.length}`);
}
logger.info(`✅ 집계 완료: ${filteredResults.length}건 결과`);
// 결과 샘플 출력
if (filteredResults.length > 0) {
logger.info(`📄 결과 샘플:`, JSON.stringify(filteredResults[0], null, 2));
}
return filteredResults;
}
}