ERP-node/backend-node/src/services/nodeFlowExecutionService.ts

4471 lines
134 KiB
TypeScript
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* 노드 플로우 실행 엔진
*
* 기능:
* - 위상 정렬 (Topological Sort)
* - 레벨별 병렬 실행 (Promise.allSettled)
* - 독립 트랜잭션 처리
* - 연쇄 중단 (부모 실패 시 자식 스킵)
*/
import { query, queryOne, transaction } from "../database/db";
import { logger } from "../utils/logger";
import axios from "axios";
// ===== 타입 정의 =====
export interface FlowNode {
id: string;
type: NodeType;
position?: { x: number; y: number };
data: NodeData;
}
export type NodeType =
| "tableSource"
| "externalDBSource"
| "restAPISource"
| "condition"
| "dataTransform"
| "aggregate"
| "formulaTransform" // 수식 변환 노드
| "insertAction"
| "updateAction"
| "deleteAction"
| "upsertAction"
| "emailAction" // 이메일 발송 액션
| "scriptAction" // 스크립트 실행 액션
| "httpRequestAction" // HTTP 요청 액션
| "comment"
| "log";
export interface NodeData {
displayName?: string;
[key: string]: any;
}
export interface FlowEdge {
id: string;
source: string;
target: string;
sourceHandle?: string;
targetHandle?: string;
}
export interface ExecutionContext {
sourceData?: any[]; // 외부에서 주입된 데이터 (선택된 행 또는 폼 데이터)
dataSourceType?: string; // "table-selection" | "form" | "none"
nodeResults: Map<string, NodeResult>;
executionOrder: string[];
buttonContext?: ButtonContext;
// 🆕 현재 실행 중인 소스 노드의 dataSourceType (context-data | table-all)
currentNodeDataSourceType?: string;
}
export interface ButtonContext {
buttonId: string;
screenId?: number;
companyCode?: string;
userId?: string;
formData?: Record<string, any>;
selectedRowsData?: Record<string, any>[];
}
export interface NodeResult {
nodeId: string;
status: "pending" | "success" | "failed" | "skipped";
data?: any;
error?: Error;
startTime: number;
endTime?: number;
}
export interface ExecutionResult {
success: boolean;
message: string;
executionTime: number;
nodes: NodeExecutionSummary[];
summary: {
total: number;
success: number;
failed: number;
skipped: number;
};
}
export interface NodeExecutionSummary {
nodeId: string;
nodeName: string;
nodeType: NodeType;
status: "success" | "failed" | "skipped" | "pending";
duration?: number;
error?: string;
}
// ===== 메인 실행 서비스 =====
export class NodeFlowExecutionService {
/**
* 플로우 실행 메인 함수
*/
static async executeFlow(
flowId: number,
contextData: Record<string, any>
): Promise<ExecutionResult> {
const startTime = Date.now();
try {
logger.info(`🚀 플로우 실행 시작: flowId=${flowId}`);
// 🔍 디버깅: contextData 상세 로그
logger.info(`🔍 contextData 상세:`, {
directCompanyCode: contextData.companyCode,
nestedCompanyCode: contextData.context?.companyCode,
directUserId: contextData.userId,
nestedUserId: contextData.context?.userId,
contextKeys: Object.keys(contextData),
nestedContextKeys: contextData.context
? Object.keys(contextData.context)
: "no nested context",
});
// 1. 플로우 데이터 조회
const flow = await queryOne<{
flow_id: number;
flow_name: string;
flow_data: any;
}>(
`SELECT flow_id, flow_name, flow_data FROM node_flows WHERE flow_id = $1`,
[flowId]
);
if (!flow) {
throw new Error(`플로우를 찾을 수 없습니다: flowId=${flowId}`);
}
const flowData =
typeof flow.flow_data === "string"
? JSON.parse(flow.flow_data)
: flow.flow_data;
const { nodes, edges } = flowData;
logger.info(`📊 플로우 정보:`, {
flowName: flow.flow_name,
nodeCount: nodes.length,
edgeCount: edges.length,
});
// 2. 실행 컨텍스트 준비
const context: ExecutionContext = {
sourceData: contextData.sourceData || [],
dataSourceType: contextData.dataSourceType || "none",
nodeResults: new Map(),
executionOrder: [],
buttonContext: {
buttonId:
contextData.buttonId || contextData.context?.buttonId || "unknown",
screenId: contextData.screenId || contextData.context?.screenId,
companyCode:
contextData.companyCode || contextData.context?.companyCode,
userId: contextData.userId || contextData.context?.userId,
formData: contextData.formData || contextData.context?.formData,
selectedRowsData:
contextData.selectedRowsData ||
contextData.context?.selectedRowsData,
},
};
logger.info(`📦 실행 컨텍스트:`, {
dataSourceType: context.dataSourceType,
sourceDataCount: context.sourceData?.length || 0,
buttonContext: context.buttonContext,
});
// 3. 위상 정렬
const levels = this.topologicalSort(nodes, edges);
logger.info(`📋 실행 순서 (레벨별):`, levels);
// 4. 🔥 전체 플로우를 하나의 트랜잭션으로 실행
let result: ExecutionResult;
try {
result = await transaction(async (client) => {
// 🔥 사용자 ID 세션 변수 설정 (트리거용)
const userId = context.buttonContext?.userId || "system";
await client.query("SELECT set_config('app.user_id', $1, true)", [
userId,
]);
// 트랜잭션 내에서 레벨별 실행
for (const level of levels) {
await this.executeLevel(level, nodes, edges, context, client);
}
// 5. 결과 생성
const executionTime = Date.now() - startTime;
const executionResult = this.generateExecutionResult(
nodes,
context,
executionTime
);
// 실패한 액션 노드가 있으면 롤백
const failedActionNodes = Array.from(
context.nodeResults.values()
).filter(
(result) =>
result.status === "failed" &&
nodes.find(
(n: FlowNode) =>
n.id === result.nodeId && this.isActionNode(n.type)
)
);
if (failedActionNodes.length > 0) {
logger.warn(
`🔄 액션 노드 실패 감지 (${failedActionNodes.length}개), 트랜잭션 롤백`
);
throw new Error(
`액션 노드 실패: ${failedActionNodes.map((n) => n.nodeId).join(", ")}`
);
}
return executionResult;
});
logger.info(`✅ 플로우 실행 완료:`, result.summary);
return result;
} catch (error) {
logger.error(`❌ 플로우 실행 실패, 모든 변경사항 롤백됨:`, error);
throw error;
}
} catch (error) {
logger.error(`❌ 플로우 실행 실패:`, error);
throw error;
}
}
/**
* 소스 데이터 준비
*/
private static prepareSourceData(contextData: Record<string, any>): any[] {
const { controlDataSource, formData, selectedRowsData } = contextData;
switch (controlDataSource) {
case "form":
return formData ? [formData] : [];
case "table-selection":
return selectedRowsData || [];
case "both":
return [
{ source: "form", data: formData },
{ source: "table", data: selectedRowsData },
];
default:
return formData ? [formData] : [];
}
}
/**
* 위상 정렬 (Topological Sort)
* DAG(Directed Acyclic Graph)를 레벨별로 그룹화
*/
private static topologicalSort(
nodes: FlowNode[],
edges: FlowEdge[]
): string[][] {
const levels: string[][] = [];
const inDegree = new Map<string, number>();
const adjacency = new Map<string, string[]>();
// 초기화
nodes.forEach((node) => {
inDegree.set(node.id, 0);
adjacency.set(node.id, []);
});
// 진입 차수 계산
edges.forEach((edge) => {
inDegree.set(edge.target, (inDegree.get(edge.target) || 0) + 1);
adjacency.get(edge.source)?.push(edge.target);
});
// 레벨별 분류
let currentLevel = nodes
.filter((node) => inDegree.get(node.id) === 0)
.map((node) => node.id);
while (currentLevel.length > 0) {
levels.push([...currentLevel]);
const nextLevel: string[] = [];
currentLevel.forEach((nodeId) => {
const neighbors = adjacency.get(nodeId) || [];
neighbors.forEach((neighbor) => {
const newDegree = (inDegree.get(neighbor) || 1) - 1;
inDegree.set(neighbor, newDegree);
if (newDegree === 0) {
nextLevel.push(neighbor);
}
});
});
currentLevel = nextLevel;
}
return levels;
}
/**
* 레벨 내 노드 병렬 실행
*/
private static async executeLevel(
nodeIds: string[],
nodes: FlowNode[],
edges: FlowEdge[],
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<void> {
logger.info(`⏳ 레벨 실행 시작: ${nodeIds.length}개 노드`);
// Promise.allSettled로 병렬 실행
const results = await Promise.allSettled(
nodeIds.map((nodeId) =>
this.executeNode(nodeId, nodes, edges, context, client)
)
);
// 결과 저장
results.forEach((result, index) => {
const nodeId = nodeIds[index];
if (result.status === "fulfilled") {
context.nodeResults.set(nodeId, result.value);
context.executionOrder.push(nodeId);
} else {
context.nodeResults.set(nodeId, {
nodeId,
status: "failed",
error: result.reason,
startTime: Date.now(),
endTime: Date.now(),
});
}
});
logger.info(`✅ 레벨 실행 완료`);
}
/**
* 개별 노드 실행
*/
private static async executeNode(
nodeId: string,
nodes: FlowNode[],
edges: FlowEdge[],
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<NodeResult> {
const startTime = Date.now();
const node = nodes.find((n) => n.id === nodeId);
if (!node) {
throw new Error(`노드를 찾을 수 없습니다: ${nodeId}`);
}
logger.info(`🔄 노드 실행 시작: ${nodeId} (${node.type})`);
// 1. 부모 노드 상태 확인 (연쇄 중단)
const parents = this.getParentNodes(nodeId, edges);
const parentFailed = parents.some((parentId) => {
const parentResult = context.nodeResults.get(parentId);
return parentResult?.status === "failed";
});
if (parentFailed) {
logger.warn(`⏭️ 노드 스킵 (부모 실패): ${nodeId}`);
return {
nodeId,
status: "skipped",
error: new Error("Parent node failed"),
startTime,
endTime: Date.now(),
};
}
// 2. 입력 데이터 준비
const inputData = this.prepareInputData(nodeId, parents, edges, context);
// 3. 노드 타입별 실행
try {
const result = await this.executeNodeByType(
node,
inputData,
context,
client
);
logger.info(`✅ 노드 실행 성공: ${nodeId}`);
return {
nodeId,
status: "success",
data: result,
startTime,
endTime: Date.now(),
};
} catch (error) {
logger.error(`❌ 노드 실행 실패: ${nodeId}`, error);
return {
nodeId,
status: "failed",
error: error as Error,
startTime,
endTime: Date.now(),
};
}
}
/**
* 부모 노드 목록 조회
*/
private static getParentNodes(nodeId: string, edges: FlowEdge[]): string[] {
return edges
.filter((edge) => edge.target === nodeId)
.map((edge) => edge.source);
}
/**
* 입력 데이터 준비
*/
private static prepareInputData(
nodeId: string,
parents: string[],
edges: FlowEdge[],
context: ExecutionContext
): any {
if (parents.length === 0) {
// 소스 노드: 원본 데이터 사용
return context.sourceData;
} else if (parents.length === 1) {
// 단일 부모: 부모의 결과 데이터 전달
const parentId = parents[0];
const parentResult = context.nodeResults.get(parentId);
let data = parentResult?.data || context.sourceData;
// 🔥 조건 노드에서 온 데이터인 경우 sourceHandle 확인
const edge = edges.find(
(e) => e.source === parentId && e.target === nodeId
);
if (
edge?.sourceHandle &&
data &&
typeof data === "object" &&
"conditionResult" in data
) {
// 조건 노드의 결과 객체
if (edge.sourceHandle === "true") {
logger.info(
`✅ TRUE 브랜치 데이터 사용: ${data.trueData?.length || 0}`
);
return data.trueData || [];
} else if (edge.sourceHandle === "false") {
logger.info(
`✅ FALSE 브랜치 데이터 사용: ${data.falseData?.length || 0}`
);
return data.falseData || [];
} else {
// sourceHandle이 없거나 다른 값이면 allData 사용
return data.allData || data;
}
}
return data;
} else {
// 다중 부모: 모든 부모의 데이터 병합
const allData: any[] = [];
parents.forEach((parentId) => {
const parentResult = context.nodeResults.get(parentId);
let data = parentResult?.data || context.sourceData;
// 🔥 조건 노드에서 온 데이터인 경우 sourceHandle 확인
const edge = edges.find(
(e) => e.source === parentId && e.target === nodeId
);
if (
edge?.sourceHandle &&
data &&
typeof data === "object" &&
"conditionResult" in data
) {
// 조건 노드의 결과 객체
if (edge.sourceHandle === "true") {
data = data.trueData || [];
} else if (edge.sourceHandle === "false") {
data = data.falseData || [];
} else {
data = data.allData || data;
}
}
// 배열이면 펼쳐서 추가
if (Array.isArray(data)) {
allData.push(...data);
} else {
allData.push(data);
}
});
logger.info(
`🔗 다중 부모 병합: ${parents.length}개 부모, 총 ${allData.length}건 데이터`
);
return allData;
}
}
/**
* 노드 타입별 실행 로직
*/
private static async executeNodeByType(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
switch (node.type) {
case "tableSource":
return this.executeTableSource(node, context);
case "externalDBSource":
return this.executeExternalDBSource(node, context);
case "restAPISource":
return this.executeRestAPISource(node, context);
case "dataTransform":
return this.executeDataTransform(node, inputData, context);
case "aggregate":
return this.executeAggregate(node, inputData, context);
case "formulaTransform":
return this.executeFormulaTransform(node, inputData, context);
case "insertAction":
return this.executeInsertAction(node, inputData, context, client);
case "updateAction":
return this.executeUpdateAction(node, inputData, context, client);
case "deleteAction":
return this.executeDeleteAction(node, inputData, context, client);
case "upsertAction":
return this.executeUpsertAction(node, inputData, context, client);
case "condition":
return this.executeCondition(node, inputData, context);
case "emailAction":
return this.executeEmailAction(node, inputData, context);
case "scriptAction":
return this.executeScriptAction(node, inputData, context);
case "httpRequestAction":
return this.executeHttpRequestAction(node, inputData, context);
case "comment":
case "log":
// 로그/코멘트는 실행 없이 통과
logger.info(`📝 ${node.type}: ${node.data.displayName || node.id}`);
return { message: "Logged" };
default:
logger.warn(`⚠️ 지원하지 않는 노드 타입: ${node.type}`);
return { message: "Unsupported node type" };
}
}
/**
* REST API 소스 노드 실행
*/
private static async executeRestAPISource(
node: FlowNode,
context: ExecutionContext
): Promise<any[]> {
const {
url,
method = "GET",
headers = {},
body,
timeout = 30000,
responseMapping,
authentication,
} = node.data;
if (!url) {
throw new Error("REST API URL이 설정되지 않았습니다.");
}
logger.info(`🌐 REST API 호출: ${method} ${url}`);
try {
// 헤더 설정
const requestHeaders: any = { ...headers };
// 인증 헤더 추가
if (authentication) {
if (authentication.type === "bearer" && authentication.token) {
requestHeaders["Authorization"] = `Bearer ${authentication.token}`;
} else if (
authentication.type === "basic" &&
authentication.username &&
authentication.password
) {
const credentials = Buffer.from(
`${authentication.username}:${authentication.password}`
).toString("base64");
requestHeaders["Authorization"] = `Basic ${credentials}`;
} else if (authentication.type === "apikey" && authentication.token) {
const headerName = authentication.apiKeyHeader || "X-API-Key";
requestHeaders[headerName] = authentication.token;
}
}
if (!requestHeaders["Content-Type"]) {
requestHeaders["Content-Type"] = "application/json";
}
// API 호출
const response = await axios({
method: method.toLowerCase(),
url,
headers: requestHeaders,
data: body,
timeout,
});
logger.info(`✅ REST API 응답 수신: ${response.status}`);
let responseData = response.data;
// 🔥 표준 API 응답 형식 자동 감지 { success, message, data }
if (
!responseMapping &&
responseData &&
typeof responseData === "object" &&
"success" in responseData &&
"data" in responseData
) {
logger.info("🔍 표준 API 응답 형식 감지, data 속성 자동 추출");
responseData = responseData.data;
}
// responseMapping이 있으면 해당 경로의 데이터 추출
if (responseMapping && responseData) {
logger.info(`🔍 응답 매핑 적용: ${responseMapping}`);
const path = responseMapping.split(".");
for (const key of path) {
if (
responseData &&
typeof responseData === "object" &&
key in responseData
) {
responseData = responseData[key];
} else {
logger.warn(
`⚠️ 응답 매핑 경로를 찾을 수 없습니다: ${responseMapping}`
);
break;
}
}
}
// 배열이 아니면 배열로 변환
if (!Array.isArray(responseData)) {
logger.info("🔄 단일 객체를 배열로 변환");
responseData = [responseData];
}
logger.info(`📦 REST API 데이터 ${responseData.length}건 반환`);
// 첫 번째 데이터 샘플 상세 로깅
if (responseData.length > 0) {
console.log("🔍 REST API 응답 데이터 샘플 (첫 번째 항목):");
console.log(JSON.stringify(responseData[0], null, 2));
console.log("🔑 사용 가능한 필드명:", Object.keys(responseData[0]));
}
return responseData;
} catch (error: any) {
logger.error(`❌ REST API 호출 실패:`, error.message);
throw new Error(`REST API 호출 실패: ${error.message}`);
}
}
/**
* 외부 DB 소스 노드 실행
*/
private static async executeExternalDBSource(
node: FlowNode,
context: ExecutionContext
): Promise<any[]> {
const { connectionId, tableName, schema, whereConditions, dataSourceType } =
node.data;
// 🆕 노드의 dataSourceType 확인 (기본값: context-data)
const nodeDataSourceType = dataSourceType || "context-data";
// 🆕 ExecutionContext에 현재 소스 노드의 dataSourceType 저장
context.currentNodeDataSourceType = nodeDataSourceType;
logger.info(
`🔌 외부 DB 소스 노드 실행: ${connectionId}.${tableName}, dataSourceType=${nodeDataSourceType}`
);
// 1. context-data 모드: 외부에서 주입된 데이터 사용
if (nodeDataSourceType === "context-data") {
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
logger.info(
`📊 컨텍스트 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}`
);
return context.sourceData;
}
logger.warn(
`⚠️ context-data 모드이지만 전달된 데이터가 없습니다. 빈 배열 반환.`
);
return [];
}
// 2. table-all 모드: 외부 DB 테이블 전체 데이터 조회
if (nodeDataSourceType === "table-all") {
if (!connectionId || !tableName) {
throw new Error(
"외부 DB 연결 정보 또는 테이블명이 설정되지 않았습니다."
);
}
try {
// 연결 풀 서비스 임포트 (동적 임포트로 순환 참조 방지)
const { ExternalDbConnectionPoolService } = await import(
"./externalDbConnectionPoolService"
);
const poolService = ExternalDbConnectionPoolService.getInstance();
// 스키마 접두사 처리
const schemaPrefix = schema ? `${schema}.` : "";
const fullTableName = `${schemaPrefix}${tableName}`;
// WHERE 절 생성
let sql = `SELECT * FROM ${fullTableName}`;
let params: any[] = [];
if (whereConditions && whereConditions.length > 0) {
const whereResult = this.buildWhereClause(whereConditions);
sql += ` ${whereResult.clause}`;
params = whereResult.values;
}
logger.info(`📊 외부 DB 쿼리 실행: ${sql}`);
// 연결 풀을 통해 쿼리 실행
const result = await poolService.executeQuery(
connectionId,
sql,
params
);
logger.info(
`✅ 외부 DB 전체 데이터 조회 완료: ${tableName}, ${result.length}`
);
return result;
} catch (error: any) {
logger.error(`❌ 외부 DB 소스 조회 실패:`, error);
throw new Error(
`외부 DB 조회 실패 (연결 ID: ${connectionId}): ${error.message}`
);
}
}
// 3. 알 수 없는 모드 (기본값으로 처리)
logger.warn(
`⚠️ 알 수 없는 dataSourceType: ${nodeDataSourceType}, context-data로 처리`
);
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
return context.sourceData;
}
return [];
}
/**
* 테이블 소스 노드 실행
*/
private static async executeTableSource(
node: FlowNode,
context: ExecutionContext
): Promise<any[]> {
const { tableName, schema, whereConditions, dataSourceType } = node.data;
// 🆕 노드의 dataSourceType 확인 (기본값: context-data)
const nodeDataSourceType = dataSourceType || "context-data";
// 🆕 ExecutionContext에 현재 소스 노드의 dataSourceType 저장
context.currentNodeDataSourceType = nodeDataSourceType;
logger.info(
`📊 테이블 소스 노드 실행: ${tableName}, dataSourceType=${nodeDataSourceType}`
);
// 1. context-data 모드: 외부에서 주입된 데이터 사용
if (nodeDataSourceType === "context-data") {
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
logger.info(
`📊 컨텍스트 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}`
);
return context.sourceData;
}
logger.warn(
`⚠️ context-data 모드이지만 전달된 데이터가 없습니다. 빈 배열 반환.`
);
return [];
}
// 2. table-all 모드: 테이블 전체 데이터 조회
if (nodeDataSourceType === "table-all") {
if (!tableName) {
logger.warn("⚠️ 테이블 소스 노드에 테이블명이 없습니다.");
return [];
}
const schemaPrefix = schema ? `${schema}.` : "";
const whereResult = whereConditions
? this.buildWhereClause(whereConditions)
: { clause: "", values: [] };
const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereResult.clause}`;
logger.info(`📊 테이블 전체 데이터 조회 SQL: ${sql}`);
const result = await query(sql, whereResult.values);
logger.info(
`📊 테이블 전체 데이터 조회: ${tableName}, ${result.length}`
);
// 디버깅: 조회된 데이터 샘플 출력
if (result.length > 0) {
logger.info(
`📊 조회된 데이터 샘플: ${JSON.stringify(result[0])?.substring(0, 300)}`
);
}
return result;
}
// 3. 알 수 없는 모드 (기본값으로 처리)
logger.warn(
`⚠️ 알 수 없는 dataSourceType: ${nodeDataSourceType}, context-data로 처리`
);
if (
context.sourceData &&
Array.isArray(context.sourceData) &&
context.sourceData.length > 0
) {
return context.sourceData;
}
return [];
}
/**
* INSERT 액션 노드 실행
*/
private static async executeInsertAction(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalInsert(node, inputData, context, client);
case "external":
return this.executeExternalInsert(node, inputData, context);
case "api":
return this.executeApiInsert(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalInsert(node, inputData, context, client);
}
}
/**
* 내부 DB INSERT 실행
*/
private static async executeInternalInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetTable, fieldMappings } = node.data;
logger.info(`💾 INSERT 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
const executeInsert = async (txClient: any) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
const insertedDataArray: any[] = [];
for (const data of dataArray) {
const fields: string[] = [];
const values: any[] = [];
// 🔥 삽입된 데이터 복사본 생성
const insertedData = { ...data };
console.log("🗺️ 필드 매핑 처리 중...");
fieldMappings.forEach((mapping: any) => {
fields.push(mapping.targetField);
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
console.log(
` ${mapping.sourceField}${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
);
values.push(value);
// 🔥 삽입된 값을 데이터에 반영
insertedData[mapping.targetField] = value;
});
// 🆕 writer와 company_code 자동 추가 (필드 매핑에 없는 경우)
const hasWriterMapping = fieldMappings.some(
(m: any) => m.targetField === "writer"
);
const hasCompanyCodeMapping = fieldMappings.some(
(m: any) => m.targetField === "company_code"
);
// 컨텍스트에서 사용자 정보 추출
const userId = context.buttonContext?.userId;
const companyCode = context.buttonContext?.companyCode;
// 🔍 디버깅: 자동 추가 조건 확인
console.log(` 🔍 INSERT 자동 추가 조건 확인:`, {
hasWriterMapping,
hasCompanyCodeMapping,
userId,
companyCode,
buttonContext: context.buttonContext,
});
// writer 자동 추가 (매핑에 없고, 컨텍스트에 userId가 있는 경우)
if (!hasWriterMapping && userId) {
fields.push("writer");
values.push(userId);
insertedData.writer = userId;
console.log(` 🔧 자동 추가: writer = ${userId}`);
} else {
console.log(
` ⚠️ writer 자동 추가 스킵: hasWriterMapping=${hasWriterMapping}, userId=${userId}`
);
}
// company_code 자동 추가 (매핑에 없고, 컨텍스트에 companyCode가 있는 경우)
if (!hasCompanyCodeMapping && companyCode && companyCode !== "*") {
fields.push("company_code");
values.push(companyCode);
insertedData.company_code = companyCode;
console.log(` 🔧 자동 추가: company_code = ${companyCode}`);
} else {
console.log(
` ⚠️ company_code 자동 추가 스킵: hasCompanyCodeMapping=${hasCompanyCodeMapping}, companyCode=${companyCode}`
);
}
const sql = `
INSERT INTO ${targetTable} (${fields.join(", ")})
VALUES (${fields.map((_, i) => `$${i + 1}`).join(", ")})
RETURNING *
`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", values);
const result = await txClient.query(sql, values);
insertedCount++;
// 🔥 RETURNING으로 받은 실제 삽입 데이터 사용 (AUTO_INCREMENT 등 포함)
if (result.rows && result.rows.length > 0) {
insertedDataArray.push(result.rows[0]);
} else {
// RETURNING이 없으면 생성한 데이터 사용
insertedDataArray.push(insertedData);
}
}
logger.info(
`✅ INSERT 완료 (내부 DB): ${targetTable}, ${insertedCount}`
);
// 🔥 삽입된 데이터 반환 (AUTO_INCREMENT ID 등 포함)
return insertedDataArray;
};
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
if (client) {
return executeInsert(client);
} else {
return transaction(executeInsert);
}
}
/**
* 외부 DB INSERT 실행
*/
private static async executeExternalInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB INSERT 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
const insertedDataArray: any[] = [];
// 🔥 Oracle의 경우 autoCommit을 false로 설정하여 트랜잭션 제어
const isOracle = externalDbType.toLowerCase() === "oracle";
for (const data of dataArray) {
const fields: string[] = [];
const values: any[] = [];
const insertedData: any = { ...data };
fieldMappings.forEach((mapping: any) => {
fields.push(mapping.targetField);
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
values.push(value);
// 🔥 삽입된 데이터 객체에 매핑된 값 적용
insertedData[mapping.targetField] = value;
});
// 외부 DB별 SQL 문법 차이 처리
let sql: string;
let params: any[];
if (isOracle) {
// Oracle: :1, :2, ... 형식
const placeholders = fields.map((_, i) => `:${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
// MySQL/MariaDB: ? 형식
const placeholders = fields.map(() => "?").join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else if (externalDbType.toLowerCase() === "mssql") {
// MSSQL: @p1, @p2, ... 형식
const placeholders = fields.map((_, i) => `@p${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
} else {
// PostgreSQL: $1, $2, ... 형식 (기본)
const placeholders = fields.map((_, i) => `$${i + 1}`).join(", ");
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
params = values;
}
await connector.executeQuery(sql, params);
insertedCount++;
insertedDataArray.push(insertedData);
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
insertedCount
);
logger.info(
`✅ INSERT 완료 (외부 DB): ${externalTargetTable}, ${insertedCount}`
);
// 🔥 삽입된 데이터 반환 (외부 DB는 자동 생성 ID 없으므로 입력 데이터 기반)
return insertedDataArray;
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
// 연결 해제
await connector.disconnect();
}
}
/**
* REST API INSERT 실행 (POST 요청)
*/
private static async executeApiInsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API INSERT 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
// Content-Type 기본값 설정
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성 (템플릿 또는 필드 매핑)
let body: any;
if (apiBodyTemplate) {
// 템플릿 변수 치환
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
// 필드 매핑 사용
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
// 전체 데이터 전송
body = data;
}
try {
const response = await axios({
method: apiMethod || "POST",
url: apiEndpoint,
headers,
data: body,
timeout: 30000, // 30초 타임아웃
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API INSERT 완료: ${results.length}`);
return { results };
}
/**
* 템플릿 변수 치환 ({{variable}} 형식)
*/
private static replaceTemplateVariables(template: string, data: any): string {
let result = template;
// {{variable}} 형식의 변수를 찾아서 치환
const matches = template.match(/\{\{([^}]+)\}\}/g);
if (matches) {
matches.forEach((match) => {
const key = match.replace(/\{\{|\}\}/g, "").trim();
const value = this.getNestedValue(data, key);
result = result.replace(match, value !== undefined ? value : "");
});
}
return result;
}
/**
* 중첩된 객체 값 가져오기 (예: "user.name")
*/
private static getNestedValue(obj: any, path: string): any {
return path.split(".").reduce((current, key) => current?.[key], obj);
}
/**
* 외부 DB 커넥터 생성 (공통 로직)
*/
private static async createExternalConnector(
connectionId: number,
dbType: string
): Promise<any> {
// 🔥 연결 풀 서비스를 통한 연결 관리 (연결 풀 고갈 방지)
const { ExternalDbConnectionPoolService } = await import(
"./externalDbConnectionPoolService"
);
const poolService = ExternalDbConnectionPoolService.getInstance();
const pool = await poolService.getPool(connectionId);
// DatabaseConnectorFactory와 호환되도록 래퍼 객체 반환
return {
executeQuery: async (sql: string, params?: any[]) => {
const result = await pool.query(sql, params);
return {
rows: Array.isArray(result) ? result : [result],
rowCount: Array.isArray(result) ? result.length : 1,
affectedRows: Array.isArray(result) ? result.length : 1,
};
},
disconnect: async () => {
// 연결 풀은 자동 관리되므로 즉시 종료하지 않음
logger.debug(`📌 연결 풀 유지 (ID: ${connectionId})`);
},
};
}
/**
* 외부 DB 트랜잭션 커밋 (Oracle 전용)
*/
private static async commitExternalTransaction(
connector: any,
dbType: string,
count: number
): Promise<void> {
if (dbType.toLowerCase() === "oracle" && count > 0) {
await connector.executeQuery("COMMIT");
logger.info(`✅ Oracle COMMIT 실행: ${count}`);
}
}
/**
* 외부 DB 트랜잭션 롤백 (Oracle 전용)
*/
private static async rollbackExternalTransaction(
connector: any,
dbType: string
): Promise<void> {
if (dbType.toLowerCase() === "oracle") {
try {
await connector.executeQuery("ROLLBACK");
logger.info(`⚠️ Oracle ROLLBACK 실행 (오류 발생)`);
} catch (rollbackError) {
logger.error(`❌ Oracle ROLLBACK 실패:`, rollbackError);
}
}
}
/**
* UPDATE 액션 노드 실행
*/
private static async executeUpdateAction(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalUpdate(node, inputData, context, client);
case "external":
return this.executeExternalUpdate(node, inputData, context);
case "api":
return this.executeApiUpdate(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalUpdate(node, inputData, context, client);
}
}
/**
* 내부 DB UPDATE 실행
*/
private static async executeInternalUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetTable, fieldMappings, whereConditions } = node.data;
logger.info(`🔄 UPDATE 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
const executeUpdate = async (txClient: any) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let updatedCount = 0;
const updatedDataArray: any[] = [];
// 🆕 table-all 모드: 각 그룹별로 UPDATE 실행 (집계 결과 반영)
if (context.currentNodeDataSourceType === "table-all") {
console.log(
"🚀 table-all 모드: 그룹별 업데이트 시작 (총 " +
dataArray.length +
"개 그룹)"
);
// 🔥 각 그룹(데이터)별로 UPDATE 실행
for (let i = 0; i < dataArray.length; i++) {
const data = dataArray[i];
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
console.log(`\n📦 그룹 ${i + 1}/${dataArray.length} 처리 중...`);
console.log("🗺️ 필드 매핑 처리 중...");
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
console.log(
` ${mapping.sourceField}${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
);
if (mapping.targetField) {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
values.push(value);
paramIndex++;
}
});
// WHERE 조건 (사용자 정의 조건만 사용, PK 자동 추가 안 함)
const whereResult = this.buildWhereClause(
whereConditions,
data,
paramIndex
);
values.push(...whereResult.values);
const sql = `
UPDATE ${targetTable}
SET ${setClauses.join(", ")}
${whereResult.clause}
`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", values);
const result = await txClient.query(sql, values);
const rowCount = result.rowCount || 0;
updatedCount += rowCount;
console.log(`✅ 그룹 ${i + 1} UPDATE 완료: ${rowCount}`);
}
logger.info(
`✅ UPDATE 완료 (내부 DB, 그룹별 처리): ${targetTable}, 총 ${updatedCount}`
);
// 업데이트된 데이터는 원본 배열 반환 (실제 DB에서 다시 조회하지 않음)
return dataArray;
}
// 🆕 context-data 모드: 개별 업데이트 (PK 자동 추가)
console.log("🎯 context-data 모드: 개별 업데이트 시작");
for (const data of dataArray) {
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// 🔥 업데이트된 데이터 복사본 생성
const updatedData = { ...data };
console.log("🗺️ 필드 매핑 처리 중...");
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
console.log(
` ${mapping.sourceField}${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
);
// targetField가 비어있지 않은 경우만 추가
if (mapping.targetField) {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
values.push(value);
paramIndex++;
// 🔥 업데이트된 값을 데이터에 반영
updatedData[mapping.targetField] = value;
} else {
console.log(
`⚠️ targetField가 비어있어 스킵: ${mapping.sourceField}`
);
}
});
// 🔑 Primary Key 자동 추가 (context-data 모드)
console.log("🔑 context-data 모드: Primary Key 자동 추가");
const enhancedWhereConditions = await this.enhanceWhereConditionsWithPK(
whereConditions,
data,
targetTable
);
const whereResult = this.buildWhereClause(
enhancedWhereConditions,
data,
paramIndex
);
// WHERE 절의 값들을 values 배열에 추가
values.push(...whereResult.values);
const sql = `
UPDATE ${targetTable}
SET ${setClauses.join(", ")}
${whereResult.clause}
`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", values);
const result = await txClient.query(sql, values);
updatedCount += result.rowCount || 0;
// 🔥 업데이트된 데이터 저장
updatedDataArray.push(updatedData);
}
logger.info(
`✅ UPDATE 완료 (내부 DB): ${targetTable}, ${updatedCount}`
);
// 🔥 업데이트된 데이터 반환 (다음 노드에서 사용)
return updatedDataArray;
};
// 🔥 클라이언트가 전달되었으면 사용, 없으면 독립 트랜잭션 생성
if (client) {
return executeUpdate(client);
} else {
return transaction(executeUpdate);
}
}
/**
* 외부 DB UPDATE 실행
*/
private static async executeExternalUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
whereConditions,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB UPDATE 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let updatedCount = 0;
const updatedDataArray: any[] = [];
for (const data of dataArray) {
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
const updatedData: any = { ...data };
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
// DB별 플레이스홀더
if (externalDbType.toLowerCase() === "oracle") {
setClauses.push(`${mapping.targetField} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
setClauses.push(`${mapping.targetField} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
setClauses.push(`${mapping.targetField} = @p${paramIndex}`);
} else {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
}
values.push(value);
paramIndex++;
// 🔥 업데이트된 데이터 객체에 매핑된 값 적용
updatedData[mapping.targetField] = value;
});
// WHERE 조건 생성
const whereClauses: string[] = [];
whereConditions?.forEach((condition: any) => {
// 🔥 수정: sourceField가 있으면 소스 데이터에서 값을 가져옴
let condValue: any;
if (condition.sourceField) {
condValue = data[condition.sourceField];
} else if (
condition.staticValue !== undefined &&
condition.staticValue !== ""
) {
condValue = condition.staticValue;
} else {
condValue = data[condition.field];
}
if (condition.operator === "IS NULL") {
whereClauses.push(`${condition.field} IS NULL`);
} else if (condition.operator === "IS NOT NULL") {
whereClauses.push(`${condition.field} IS NOT NULL`);
} else {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(
`${condition.field} ${condition.operator} :${paramIndex}`
);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${condition.field} ${condition.operator} ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(
`${condition.field} ${condition.operator} @p${paramIndex}`
);
} else {
whereClauses.push(
`${condition.field} ${condition.operator} $${paramIndex}`
);
}
values.push(condValue);
paramIndex++;
}
});
const whereClause =
whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : "";
const sql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} ${whereClause}`;
const result = await connector.executeQuery(sql, values);
updatedCount += result.rowCount || result.affectedRows || 0;
updatedDataArray.push(updatedData);
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
updatedCount
);
logger.info(
`✅ UPDATE 완료 (외부 DB): ${externalTargetTable}, ${updatedCount}`
);
// 🔥 업데이트된 데이터 반환
return updatedDataArray;
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API UPDATE 실행 (PUT/PATCH 요청)
*/
private static async executeApiUpdate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API UPDATE 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성
let body: any;
if (apiBodyTemplate) {
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
body = data;
}
try {
const response = await axios({
method: apiMethod || "PUT",
url: apiEndpoint,
headers,
data: body,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API UPDATE 완료: ${results.length}`);
return { results };
}
/**
* DELETE 액션 노드 실행
*/
private static async executeDeleteAction(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalDelete(node, inputData, context, client);
case "external":
return this.executeExternalDelete(node, inputData, context);
case "api":
return this.executeApiDelete(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalDelete(node, inputData, context, client);
}
}
/**
* 내부 DB DELETE 실행
*/
private static async executeInternalDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetTable, whereConditions } = node.data;
logger.info(`🗑️ DELETE 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
const executeDelete = async (txClient: any) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let deletedCount = 0;
const deletedDataArray: any[] = [];
// 🆕 table-all 모드: 단일 SQL로 일괄 삭제
if (context.currentNodeDataSourceType === "table-all") {
console.log("🚀 table-all 모드: 단일 SQL로 일괄 삭제 시작");
// 첫 번째 데이터를 참조하여 WHERE 절 생성
const firstData = dataArray[0];
// WHERE 조건 (사용자 정의 조건만 사용, PK 자동 추가 안 함)
const whereResult = this.buildWhereClause(
whereConditions,
firstData,
1
);
const sql = `DELETE FROM ${targetTable} ${whereResult.clause} RETURNING *`;
console.log("📝 실행할 SQL (일괄 처리):", sql);
console.log("📊 바인딩 값:", whereResult.values);
const result = await txClient.query(sql, whereResult.values);
deletedCount = result.rowCount || 0;
// 🔥 RETURNING으로 받은 삭제된 데이터 저장
if (result.rows && result.rows.length > 0) {
deletedDataArray.push(...result.rows);
}
logger.info(
`✅ DELETE 완료 (내부 DB, 일괄 처리): ${targetTable}, ${deletedCount}`
);
return deletedDataArray;
}
// 🆕 context-data 모드: 개별 삭제 (PK 자동 추가)
console.log("🎯 context-data 모드: 개별 삭제 시작");
for (const data of dataArray) {
console.log("🔍 WHERE 조건 처리 중...");
// 🔑 Primary Key 자동 추가 (context-data 모드)
console.log("🔑 context-data 모드: Primary Key 자동 추가");
const enhancedWhereConditions = await this.enhanceWhereConditionsWithPK(
whereConditions,
data,
targetTable
);
const whereResult = this.buildWhereClause(
enhancedWhereConditions,
data,
1
);
const sql = `DELETE FROM ${targetTable} ${whereResult.clause} RETURNING *`;
console.log("📝 실행할 SQL:", sql);
console.log("📊 바인딩 값:", whereResult.values);
const result = await txClient.query(sql, whereResult.values);
deletedCount += result.rowCount || 0;
// 🔥 RETURNING으로 받은 삭제된 데이터 저장
if (result.rows && result.rows.length > 0) {
deletedDataArray.push(...result.rows);
}
}
logger.info(
`✅ DELETE 완료 (내부 DB): ${targetTable}, ${deletedCount}`
);
// 🔥 삭제된 데이터 반환 (로그 기록 등에 사용)
return deletedDataArray;
};
// 🔥 클라이언트가 전달되었으면 사용, 없으면 독립 트랜잭션 생성
if (client) {
return executeDelete(client);
} else {
return transaction(executeDelete);
}
}
/**
* 외부 DB DELETE 실행
*/
private static async executeExternalDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
whereConditions,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
logger.info(
`🔌 외부 DB DELETE 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let deletedCount = 0;
const deletedDataArray: any[] = [];
for (const data of dataArray) {
const whereClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// WHERE 조건 생성
whereConditions?.forEach((condition: any) => {
// 🔥 수정: sourceField가 있으면 소스 데이터에서 값을 가져옴
let condValue: any;
if (condition.sourceField) {
condValue = data[condition.sourceField];
} else if (
condition.staticValue !== undefined &&
condition.staticValue !== ""
) {
condValue = condition.staticValue;
} else {
condValue = data[condition.field];
}
if (condition.operator === "IS NULL") {
whereClauses.push(`${condition.field} IS NULL`);
} else if (condition.operator === "IS NOT NULL") {
whereClauses.push(`${condition.field} IS NOT NULL`);
} else {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(
`${condition.field} ${condition.operator} :${paramIndex}`
);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${condition.field} ${condition.operator} ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(
`${condition.field} ${condition.operator} @p${paramIndex}`
);
} else {
whereClauses.push(
`${condition.field} ${condition.operator} $${paramIndex}`
);
}
values.push(condValue);
paramIndex++;
}
});
const whereClause =
whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : "";
if (!whereClause) {
throw new Error(
"DELETE 작업에 WHERE 조건이 필요합니다. (전체 삭제 방지)"
);
}
// 🔥 삭제 전에 데이터 조회 (로그 기록 용도)
const selectSql = `SELECT * FROM ${externalTargetTable} ${whereClause}`;
const selectResult = await connector.executeQuery(selectSql, values);
if (selectResult && selectResult.length > 0) {
deletedDataArray.push(...selectResult);
}
// 실제 삭제 수행
const deleteSql = `DELETE FROM ${externalTargetTable} ${whereClause}`;
const result = await connector.executeQuery(deleteSql, values);
deletedCount += result.rowCount || result.affectedRows || 0;
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
deletedCount
);
logger.info(
`✅ DELETE 완료 (외부 DB): ${externalTargetTable}, ${deletedCount}`
);
// 🔥 삭제된 데이터 반환
return deletedDataArray;
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API DELETE 실행 (DELETE 요청)
*/
private static async executeApiDelete(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { apiEndpoint, apiAuthType, apiAuthConfig, apiHeaders } = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API DELETE 시작: ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
// DELETE는 일반적으로 URL 파라미터 또는 경로에 ID 포함
// 템플릿 변수 치환 지원 (예: /api/users/{{id}})
const url = this.replaceTemplateVariables(apiEndpoint, data);
try {
const response = await axios({
method: "DELETE",
url,
headers,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API DELETE 완료: ${results.length}`);
return { results };
}
/**
* UPSERT 액션 노드 실행
*/
private static async executeUpsertAction(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetType } = node.data;
// 🔥 타겟 타입별 분기
switch (targetType) {
case "internal":
return this.executeInternalUpsert(node, inputData, context, client);
case "external":
return this.executeExternalUpsert(node, inputData, context);
case "api":
return this.executeApiUpsert(node, inputData, context);
default:
// 하위 호환성: targetType이 없으면 internal로 간주
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
return this.executeInternalUpsert(node, inputData, context, client);
}
}
/**
* 내부 DB UPSERT 실행 (로직 기반)
* DB 제약 조건 없이 SELECT → UPDATE or INSERT 방식으로 구현
*/
private static async executeInternalUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext,
client?: any // 🔥 트랜잭션 클라이언트 (optional)
): Promise<any> {
const { targetTable, fieldMappings, conflictKeys } = node.data;
if (!targetTable || !fieldMappings || fieldMappings.length === 0) {
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
}
if (!conflictKeys || conflictKeys.length === 0) {
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
}
logger.info(`🔀 UPSERT 노드 실행: ${targetTable}`);
console.log(
"📥 입력 데이터 타입:",
typeof inputData,
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
);
if (inputData && inputData.length > 0) {
console.log("📄 첫 번째 입력 데이터:");
console.log(JSON.stringify(inputData[0], null, 2));
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
}
console.log("🔑 충돌 키:", conflictKeys);
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
const executeUpsert = async (txClient: any) => {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
let updatedCount = 0;
for (const data of dataArray) {
// 1. 충돌 키 값 추출
const conflictKeyValues: Record<string, any> = {};
conflictKeys.forEach((key: string) => {
const mapping = fieldMappings.find((m: any) => m.targetField === key);
if (mapping) {
conflictKeyValues[key] =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
}
});
// 2. 존재 여부 확인 (SELECT)
const whereConditions = conflictKeys
.map((key: string, index: number) => `${key} = $${index + 1}`)
.join(" AND ");
const whereValues = conflictKeys.map(
(key: string) => conflictKeyValues[key]
);
console.log("🔍 존재 여부 확인 - WHERE 조건:", whereConditions);
console.log("🔍 존재 여부 확인 - 바인딩 값:", whereValues);
const checkSql = `SELECT 1 FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`;
const existingRow = await txClient.query(checkSql, whereValues);
if (existingRow.rows.length > 0) {
// 3-A. 존재하면 UPDATE
const setClauses: string[] = [];
const updateValues: any[] = [];
let paramIndex = 1;
fieldMappings.forEach((mapping: any) => {
// 충돌 키가 아닌 필드만 UPDATE
if (!conflictKeys.includes(mapping.targetField)) {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
updateValues.push(value);
paramIndex++;
}
});
// WHERE 조건 생성 (파라미터 인덱스 이어서)
const updateWhereConditions = conflictKeys
.map(
(key: string, index: number) => `${key} = $${paramIndex + index}`
)
.join(" AND ");
// WHERE 조건 값 추가
whereValues.forEach((val: any) => {
updateValues.push(val);
});
const updateSql = `
UPDATE ${targetTable}
SET ${setClauses.join(", ")}
WHERE ${updateWhereConditions}
`;
logger.info(`🔄 UPDATE 실행:`, {
table: targetTable,
conflictKeys,
conflictKeyValues,
sql: updateSql,
values: updateValues,
});
await txClient.query(updateSql, updateValues);
updatedCount++;
} else {
// 3-B. 없으면 INSERT
const columns: string[] = [];
const values: any[] = [];
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
columns.push(mapping.targetField);
values.push(value);
});
// 🆕 writer와 company_code 자동 추가 (필드 매핑에 없는 경우)
const hasWriterMapping = fieldMappings.some(
(m: any) => m.targetField === "writer"
);
const hasCompanyCodeMapping = fieldMappings.some(
(m: any) => m.targetField === "company_code"
);
// 컨텍스트에서 사용자 정보 추출
const userId = context.buttonContext?.userId;
const companyCode = context.buttonContext?.companyCode;
// writer 자동 추가 (매핑에 없고, 컨텍스트에 userId가 있는 경우)
if (!hasWriterMapping && userId) {
columns.push("writer");
values.push(userId);
logger.info(` 🔧 UPSERT INSERT - 자동 추가: writer = ${userId}`);
}
// company_code 자동 추가 (매핑에 없고, 컨텍스트에 companyCode가 있는 경우)
if (!hasCompanyCodeMapping && companyCode && companyCode !== "*") {
columns.push("company_code");
values.push(companyCode);
logger.info(
` 🔧 UPSERT INSERT - 자동 추가: company_code = ${companyCode}`
);
}
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
const insertSql = `
INSERT INTO ${targetTable} (${columns.join(", ")})
VALUES (${placeholders})
`;
logger.info(` INSERT 실행:`, {
table: targetTable,
conflictKeys,
conflictKeyValues,
});
await txClient.query(insertSql, values);
insertedCount++;
}
}
logger.info(
`✅ UPSERT 완료 (내부 DB): ${targetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}`
);
return {
insertedCount,
updatedCount,
totalCount: insertedCount + updatedCount,
};
};
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
if (client) {
return executeUpsert(client);
} else {
return transaction(executeUpsert);
}
}
/**
* 외부 DB UPSERT 실행 (로직 기반)
*/
private static async executeExternalUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
externalConnectionId,
externalDbType,
externalTargetTable,
fieldMappings,
conflictKeys,
} = node.data;
if (!externalConnectionId || !externalTargetTable) {
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
}
if (!fieldMappings || fieldMappings.length === 0) {
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
}
if (!conflictKeys || conflictKeys.length === 0) {
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
}
logger.info(
`🔌 외부 DB UPSERT 시작: ${externalDbType} - ${externalTargetTable}`
);
// 외부 DB 커넥터 생성
const connector = await this.createExternalConnector(
externalConnectionId,
externalDbType
);
try {
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
let insertedCount = 0;
let updatedCount = 0;
for (const data of dataArray) {
// 1. 충돌 키 값 추출
const conflictKeyValues: Record<string, any> = {};
conflictKeys.forEach((key: string) => {
const mapping = fieldMappings.find((m: any) => m.targetField === key);
if (mapping) {
conflictKeyValues[key] =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
}
});
// 2. 존재 여부 확인 (SELECT)
const whereClauses: string[] = [];
const whereValues: any[] = [];
let paramIndex = 1;
conflictKeys.forEach((key: string) => {
if (externalDbType.toLowerCase() === "oracle") {
whereClauses.push(`${key} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
whereClauses.push(`${key} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
whereClauses.push(`${key} = @p${paramIndex}`);
} else {
whereClauses.push(`${key} = $${paramIndex}`);
}
whereValues.push(conflictKeyValues[key]);
paramIndex++;
});
const checkSql = `SELECT * FROM ${externalTargetTable} WHERE ${whereClauses.join(" AND ")} LIMIT 1`;
const existingRow = await connector.executeQuery(checkSql, whereValues);
const hasExistingRow =
existingRow.rows?.length > 0 || existingRow.length > 0;
if (hasExistingRow) {
// 3-A. 존재하면 UPDATE
const setClauses: string[] = [];
const updateValues: any[] = [];
paramIndex = 1;
fieldMappings.forEach((mapping: any) => {
if (!conflictKeys.includes(mapping.targetField)) {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
if (externalDbType.toLowerCase() === "oracle") {
setClauses.push(`${mapping.targetField} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
setClauses.push(`${mapping.targetField} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
setClauses.push(`${mapping.targetField} = @p${paramIndex}`);
} else {
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
}
updateValues.push(value);
paramIndex++;
}
});
// WHERE 조건 생성
const updateWhereClauses: string[] = [];
conflictKeys.forEach((key: string) => {
if (externalDbType.toLowerCase() === "oracle") {
updateWhereClauses.push(`${key} = :${paramIndex}`);
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
updateWhereClauses.push(`${key} = ?`);
} else if (externalDbType.toLowerCase() === "mssql") {
updateWhereClauses.push(`${key} = @p${paramIndex}`);
} else {
updateWhereClauses.push(`${key} = $${paramIndex}`);
}
updateValues.push(conflictKeyValues[key]);
paramIndex++;
});
const updateSql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} WHERE ${updateWhereClauses.join(" AND ")}`;
await connector.executeQuery(updateSql, updateValues);
updatedCount++;
} else {
// 3-B. 없으면 INSERT
const columns: string[] = [];
const values: any[] = [];
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
columns.push(mapping.targetField);
values.push(value);
});
let insertSql: string;
if (externalDbType.toLowerCase() === "oracle") {
const placeholders = columns.map((_, i) => `:${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else if (
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
) {
const placeholders = columns.map(() => "?").join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else if (externalDbType.toLowerCase() === "mssql") {
const placeholders = columns.map((_, i) => `@p${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
} else {
const placeholders = columns.map((_, i) => `$${i + 1}`).join(", ");
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
}
await connector.executeQuery(insertSql, values);
insertedCount++;
}
}
// 🔥 Oracle의 경우 명시적 COMMIT
await this.commitExternalTransaction(
connector,
externalDbType,
insertedCount + updatedCount
);
logger.info(
`✅ UPSERT 완료 (외부 DB): ${externalTargetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}`
);
return {
insertedCount,
updatedCount,
totalCount: insertedCount + updatedCount,
};
} catch (error) {
// 🔥 Oracle의 경우 오류 시 ROLLBACK
await this.rollbackExternalTransaction(connector, externalDbType);
throw error;
} finally {
await connector.disconnect();
}
}
/**
* REST API UPSERT 실행 (POST/PUT 요청)
* API 응답에 따라 INSERT/UPDATE 판단
*/
private static async executeApiUpsert(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
apiEndpoint,
apiMethod,
apiAuthType,
apiAuthConfig,
apiHeaders,
apiBodyTemplate,
fieldMappings,
} = node.data;
if (!apiEndpoint) {
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
}
logger.info(`🌐 REST API UPSERT 시작: ${apiMethod} ${apiEndpoint}`);
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
const results: any[] = [];
for (const data of dataArray) {
// 헤더 설정
const headers: any = { ...apiHeaders };
// 인증 헤더 추가
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
} else if (
apiAuthType === "basic" &&
apiAuthConfig?.username &&
apiAuthConfig?.password
) {
const credentials = Buffer.from(
`${apiAuthConfig.username}:${apiAuthConfig.password}`
).toString("base64");
headers["Authorization"] = `Basic ${credentials}`;
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
headers[headerName] = apiAuthConfig.apiKey;
}
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
// 바디 생성
let body: any;
if (apiBodyTemplate) {
body = this.replaceTemplateVariables(apiBodyTemplate, data);
} else if (fieldMappings && fieldMappings.length > 0) {
body = {};
fieldMappings.forEach((mapping: any) => {
const value =
mapping.staticValue !== undefined
? mapping.staticValue
: data[mapping.sourceField];
body[mapping.targetField] = value;
});
} else {
body = data;
}
try {
// UPSERT는 일반적으로 PUT 메서드 사용 (멱등성)
const response = await axios({
method: apiMethod || "PUT",
url: apiEndpoint,
headers,
data: body,
timeout: 30000,
});
results.push({
status: response.status,
data: response.data,
});
} catch (error: any) {
logger.error(
`❌ API 요청 실패: ${error.response?.status || error.message}`
);
throw error;
}
}
logger.info(`✅ REST API UPSERT 완료: ${results.length}`);
return { results };
}
/**
* 조건 노드 실행
*/
private static async executeCondition(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const { conditions, logic } = node.data;
logger.info(
`🔍 조건 노드 실행 - inputData 타입: ${typeof inputData}, 배열 여부: ${Array.isArray(inputData)}, 길이: ${Array.isArray(inputData) ? inputData.length : "N/A"}`
);
logger.info(`🔍 조건 개수: ${conditions?.length || 0}, 로직: ${logic}`);
if (inputData) {
console.log(
"📥 조건 노드 입력 데이터:",
JSON.stringify(inputData, null, 2).substring(0, 500)
);
} else {
console.log("⚠️ 조건 노드 입력 데이터가 없습니다!");
}
// 조건이 없으면 모든 데이터 통과
if (!conditions || conditions.length === 0) {
logger.info("⚠️ 조건이 설정되지 않음 - 모든 데이터 통과");
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
return {
conditionResult: true,
trueData: dataArray,
falseData: [],
allData: dataArray,
};
}
// inputData가 배열인 경우 각 항목을 필터링
if (Array.isArray(inputData)) {
const trueData: any[] = [];
const falseData: any[] = [];
inputData.forEach((item: any) => {
const results = conditions.map((condition: any) => {
const fieldValue = item[condition.field];
let compareValue = condition.value;
if (condition.valueType === "field") {
compareValue = item[condition.value];
logger.info(
`🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})`
);
} else {
logger.info(
`📊 고정값 비교: ${condition.field} (${fieldValue}) vs ${compareValue}`
);
}
return this.evaluateCondition(
fieldValue,
condition.operator,
compareValue
);
});
const result =
logic === "OR"
? results.some((r: boolean) => r)
: results.every((r: boolean) => r);
if (result) {
trueData.push(item);
} else {
falseData.push(item);
}
});
logger.info(
`🔍 조건 필터링 결과: TRUE ${trueData.length}건 / FALSE ${falseData.length}건 (${logic} 로직)`
);
return {
conditionResult: trueData.length > 0,
trueData,
falseData,
allData: inputData,
};
}
// 단일 객체인 경우
const results = conditions.map((condition: any) => {
const fieldValue = inputData[condition.field];
let compareValue = condition.value;
if (condition.valueType === "field") {
compareValue = inputData[condition.value];
logger.info(
`🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})`
);
} else {
logger.info(
`📊 고정값 비교: ${condition.field} (${fieldValue}) vs ${compareValue}`
);
}
return this.evaluateCondition(
fieldValue,
condition.operator,
compareValue
);
});
const result =
logic === "OR"
? results.some((r: boolean) => r)
: results.every((r: boolean) => r);
logger.info(`🔍 조건 평가 결과: ${result} (${logic} 로직)`);
// ⚠️ 조건 노드는 TRUE/FALSE 브랜치를 위한 특별한 처리 필요
// 조건 결과를 저장하고, 원본 데이터는 항상 반환
// 다음 노드에서 sourceHandle을 기반으로 필터링됨
return {
conditionResult: result,
trueData: result ? [inputData] : [],
falseData: result ? [] : [inputData],
allData: [inputData], // 일단 모든 데이터 전달
};
}
/**
* WHERE 절 생성
*/
/**
* 테이블의 Primary Key 컬럼 조회 (내부 DB - PostgreSQL)
*/
private static async getPrimaryKeyColumns(
tableName: string,
schema: string = "public"
): Promise<string[]> {
const sql = `
SELECT a.attname AS column_name
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass
AND i.indisprimary
ORDER BY array_position(i.indkey, a.attnum);
`;
const fullTableName = schema ? `${schema}.${tableName}` : tableName;
try {
const result = await query(sql, [fullTableName]);
const pkColumns = result.map((row: any) => row.column_name);
if (pkColumns.length > 0) {
console.log(
`🔑 테이블 ${tableName}의 Primary Key: ${pkColumns.join(", ")}`
);
} else {
console.log(`⚠️ 테이블 ${tableName}에 Primary Key가 없습니다`);
}
return pkColumns;
} catch (error) {
console.error(`❌ Primary Key 조회 실패 (${tableName}):`, error);
return [];
}
}
/**
* WHERE 조건에 Primary Key 자동 추가 (컨텍스트 데이터 사용 시)
*
* 테이블의 실제 Primary Key를 자동으로 감지하여 WHERE 조건에 추가
*/
private static async enhanceWhereConditionsWithPK(
whereConditions: any[],
data: any,
tableName: string,
schema: string = "public"
): Promise<any[]> {
if (!data) {
console.log("⚠️ 입력 데이터가 없어 WHERE 조건 자동 추가 불가");
return whereConditions || [];
}
// 🔑 테이블의 실제 Primary Key 컬럼 조회
const pkColumns = await this.getPrimaryKeyColumns(tableName, schema);
if (pkColumns.length === 0) {
console.log(`⚠️ 테이블 ${tableName}에 Primary Key가 없어 자동 추가 불가`);
return whereConditions || [];
}
// 🔍 데이터에 모든 PK 컬럼이 있는지 확인
const missingPKColumns = pkColumns.filter(
(col) => data[col] === undefined || data[col] === null
);
if (missingPKColumns.length > 0) {
console.log(
`⚠️ 입력 데이터에 Primary Key 컬럼이 없어 자동 추가 불가: ${missingPKColumns.join(", ")}`
);
return whereConditions || [];
}
// 🔍 이미 WHERE 조건에 모든 PK가 포함되어 있는지 확인
const existingFields = new Set(
(whereConditions || []).map((cond: any) => cond.field)
);
const allPKsExist = pkColumns.every(
(col) =>
existingFields.has(col) || existingFields.has(`${tableName}.${col}`)
);
if (allPKsExist) {
console.log("✅ WHERE 조건에 이미 모든 Primary Key 포함, 추가하지 않음");
return whereConditions || [];
}
// 🔥 Primary Key 조건들을 맨 앞에 추가
const pkConditions = pkColumns.map((col) => ({
field: col,
operator: "EQUALS",
value: data[col],
}));
const enhanced = [...pkConditions, ...(whereConditions || [])];
const pkValues = pkColumns.map((col) => `${col} = ${data[col]}`).join(", ");
console.log(`🔑 WHERE 조건에 Primary Key 자동 추가: ${pkValues}`);
return enhanced;
}
private static buildWhereClause(
conditions: any[],
data?: any,
startIndex: number = 1
): { clause: string; values: any[] } {
if (!conditions || conditions.length === 0) {
return { clause: "", values: [] };
}
const values: any[] = [];
const clauses = conditions.map((condition, index) => {
// 🔥 수정: sourceField가 있으면 소스 데이터에서 값을 가져오고,
// 없으면 staticValue 또는 기존 field 사용
let value: any;
if (data) {
if (condition.sourceField) {
// sourceField가 있으면 소스 데이터에서 해당 필드의 값을 가져옴
value = data[condition.sourceField];
} else if (
condition.staticValue !== undefined &&
condition.staticValue !== ""
) {
// staticValue가 있으면 사용
value = condition.staticValue;
} else {
// 둘 다 없으면 기존 방식 (field로 값 조회)
value = data[condition.field];
}
} else {
value = condition.value;
}
values.push(value);
// 연산자를 SQL 문법으로 변환
let sqlOperator = condition.operator;
switch (condition.operator.toUpperCase()) {
case "EQUALS":
sqlOperator = "=";
break;
case "NOT_EQUALS":
case "NOTEQUALS":
sqlOperator = "!=";
break;
case "GREATER_THAN":
case "GREATERTHAN":
sqlOperator = ">";
break;
case "LESS_THAN":
case "LESSTHAN":
sqlOperator = "<";
break;
case "GREATER_THAN_OR_EQUAL":
case "GREATERTHANOREQUAL":
sqlOperator = ">=";
break;
case "LESS_THAN_OR_EQUAL":
case "LESSTHANOREQUAL":
sqlOperator = "<=";
break;
case "LIKE":
sqlOperator = "LIKE";
break;
case "NOT_LIKE":
case "NOTLIKE":
sqlOperator = "NOT LIKE";
break;
case "IN":
sqlOperator = "IN";
break;
case "NOT_IN":
case "NOTIN":
sqlOperator = "NOT IN";
break;
case "IS_NULL":
case "ISNULL":
return `${condition.field} IS NULL`;
case "IS_NOT_NULL":
case "ISNOTNULL":
return `${condition.field} IS NOT NULL`;
default:
// 이미 SQL 문법인 경우 (=, !=, >, < 등)
sqlOperator = condition.operator;
}
return `${condition.field} ${sqlOperator} $${startIndex + index}`;
});
return { clause: `WHERE ${clauses.join(" AND ")}`, values };
}
/**
* 조건 평가
*/
private static evaluateCondition(
fieldValue: any,
operator: string,
expectedValue: any
): boolean {
// NULL 체크
if (operator === "IS_NULL" || operator === "isNull") {
return (
fieldValue === null || fieldValue === undefined || fieldValue === ""
);
}
if (operator === "IS_NOT_NULL" || operator === "isNotNull") {
return (
fieldValue !== null && fieldValue !== undefined && fieldValue !== ""
);
}
// 비교 연산자: 타입 변환
const normalizedOperator = operator.toUpperCase();
switch (normalizedOperator) {
case "EQUALS":
case "=":
return fieldValue == expectedValue; // 느슨한 비교
case "NOT_EQUALS":
case "NOTEQUALS":
case "!=":
return fieldValue != expectedValue;
case "GREATER_THAN":
case "GREATERTHAN":
case ">":
return Number(fieldValue) > Number(expectedValue);
case "LESS_THAN":
case "LESSTHAN":
case "<":
return Number(fieldValue) < Number(expectedValue);
case "GREATER_THAN_OR_EQUAL":
case "GREATERTHANOREQUAL":
case ">=":
return Number(fieldValue) >= Number(expectedValue);
case "LESS_THAN_OR_EQUAL":
case "LESSTHANOREQUAL":
case "<=":
return Number(fieldValue) <= Number(expectedValue);
case "LIKE":
case "CONTAINS":
return String(fieldValue)
.toLowerCase()
.includes(String(expectedValue).toLowerCase());
case "NOT_LIKE":
case "NOTLIKE":
return !String(fieldValue)
.toLowerCase()
.includes(String(expectedValue).toLowerCase());
case "IN":
if (Array.isArray(expectedValue)) {
return expectedValue.includes(fieldValue);
}
// 쉼표로 구분된 문자열
const inValues = String(expectedValue)
.split(",")
.map((v) => v.trim());
return inValues.includes(String(fieldValue));
case "NOT_IN":
case "NOTIN":
if (Array.isArray(expectedValue)) {
return !expectedValue.includes(fieldValue);
}
const notInValues = String(expectedValue)
.split(",")
.map((v) => v.trim());
return !notInValues.includes(String(fieldValue));
default:
logger.warn(`⚠️ 지원되지 않는 연산자: ${operator}`);
return false;
}
}
/**
* 실행 결과 생성
*/
private static generateExecutionResult(
nodes: FlowNode[],
context: ExecutionContext,
executionTime: number
): ExecutionResult {
const nodeSummaries: NodeExecutionSummary[] = nodes.map((node) => {
const result = context.nodeResults.get(node.id);
return {
nodeId: node.id,
nodeName: node.data.displayName || node.id,
nodeType: node.type,
status: result?.status || "pending",
duration: result?.endTime
? result.endTime - result.startTime
: undefined,
error: result?.error?.message,
};
});
const summary = {
total: nodes.length,
success: nodeSummaries.filter((n) => n.status === "success").length,
failed: nodeSummaries.filter((n) => n.status === "failed").length,
skipped: nodeSummaries.filter((n) => n.status === "skipped").length,
};
const success = summary.failed === 0;
// 실패한 노드 상세 로깅
if (!success) {
const failedNodes = nodeSummaries.filter((n) => n.status === "failed");
logger.error(
`❌ 실패한 노드들:`,
failedNodes.map((n) => ({
nodeId: n.nodeId,
nodeName: n.nodeName,
nodeType: n.nodeType,
error: n.error,
}))
);
}
return {
success,
message: success
? `플로우 실행 성공 (${summary.success}/${summary.total})`
: `플로우 실행 부분 실패 (성공: ${summary.success}, 실패: ${summary.failed})`,
executionTime,
nodes: nodeSummaries,
summary,
};
}
/**
* 데이터 변환 노드 실행
*/
private static async executeDataTransform(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any[]> {
const { transformations } = node.data;
if (
!transformations ||
!Array.isArray(transformations) ||
transformations.length === 0
) {
logger.warn(`⚠️ 데이터 변환 노드에 변환 규칙이 없습니다: ${node.id}`);
return Array.isArray(inputData) ? inputData : [inputData];
}
// inputData를 배열로 정규화
const rows = Array.isArray(inputData) ? inputData : [inputData];
logger.info(
`🔄 데이터 변환 시작: ${rows.length}개 행, ${transformations.length}개 변환`
);
// 각 변환 규칙을 순차적으로 적용
let transformedRows = rows;
for (const transform of transformations) {
const transformType = transform.type;
logger.info(` 🔹 변환 적용: ${transformType}`);
transformedRows = this.applyTransformation(transformedRows, transform);
}
logger.info(`✅ 데이터 변환 완료: ${transformedRows.length}개 행`);
return transformedRows;
}
/**
* 단일 변환 규칙 적용
*/
private static applyTransformation(rows: any[], transform: any): any[] {
const {
type,
sourceField,
targetField,
delimiter,
separator,
searchValue,
replaceValue,
splitIndex,
castType,
expression,
} = transform;
// 타겟 필드 결정 (비어있으면 소스 필드 사용 = in-place)
const actualTargetField = targetField || sourceField;
switch (type) {
case "UPPERCASE":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().toUpperCase() || row[sourceField],
}));
case "LOWERCASE":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().toLowerCase() || row[sourceField],
}));
case "TRIM":
return rows.map((row) => ({
...row,
[actualTargetField]:
row[sourceField]?.toString().trim() || row[sourceField],
}));
case "EXPLODE":
return this.applyExplode(
rows,
sourceField,
actualTargetField,
delimiter || ","
);
case "CONCAT":
return rows.map((row) => {
const value1 = row[sourceField] || "";
// CONCAT은 여러 필드를 합칠 수 있지만, 단순화하여 expression 사용
const value2 = expression || "";
return {
...row,
[actualTargetField]: `${value1}${separator || ""}${value2}`,
};
});
case "SPLIT":
return rows.map((row) => {
const value = row[sourceField]?.toString() || "";
const parts = value.split(delimiter || ",");
const index = splitIndex !== undefined ? splitIndex : 0;
return {
...row,
[actualTargetField]: parts[index] || "",
};
});
case "REPLACE":
return rows.map((row) => {
const value = row[sourceField]?.toString() || "";
const replaced = value.replace(
new RegExp(searchValue || "", "g"),
replaceValue || ""
);
return {
...row,
[actualTargetField]: replaced,
};
});
case "CAST":
return rows.map((row) => {
const value = row[sourceField];
let castedValue = value;
switch (castType) {
case "string":
castedValue = value?.toString() || "";
break;
case "number":
castedValue = parseFloat(value) || 0;
break;
case "boolean":
castedValue = Boolean(value);
break;
case "date":
castedValue = new Date(value);
break;
}
return {
...row,
[actualTargetField]: castedValue,
};
});
case "FORMAT":
case "CALCULATE":
case "JSON_EXTRACT":
case "CUSTOM":
// 표현식 기반 변환 (현재는 단순 구현)
logger.warn(`⚠️ ${type} 변환은 아직 완전히 구현되지 않았습니다.`);
return rows.map((row) => ({
...row,
[actualTargetField]: row[sourceField] || "",
}));
default:
logger.warn(`⚠️ 지원하지 않는 변환 타입: ${type}`);
return rows;
}
}
/**
* EXPLODE 변환: 1개 행을 N개 행으로 확장
*/
private static applyExplode(
rows: any[],
sourceField: string,
targetField: string,
delimiter: string
): any[] {
const expandedRows: any[] = [];
for (const row of rows) {
const value = row[sourceField];
if (!value) {
// 값이 없으면 원본 행 유지
expandedRows.push(row);
continue;
}
// 문자열을 구분자로 분리
const values = value
.toString()
.split(delimiter)
.map((v: string) => v.trim());
// 각 값마다 새 행 생성
for (const val of values) {
expandedRows.push({
...row, // 다른 필드들은 복제
[targetField]: val, // 타겟 필드에 분리된 값 저장
});
}
}
logger.info(
` 💥 EXPLODE: ${rows.length}개 행 → ${expandedRows.length}개 행`
);
return expandedRows;
}
/**
* 🔥 액션 노드 여부 확인
*/
private static isActionNode(nodeType: NodeType): boolean {
return [
"insertAction",
"updateAction",
"deleteAction",
"upsertAction",
].includes(nodeType);
}
/**
* 집계 노드 실행 (SUM, COUNT, AVG, MIN, MAX 등)
*/
private static async executeAggregate(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any[]> {
const {
groupByFields = [],
aggregations = [],
havingConditions = [],
} = node.data;
logger.info(`📊 집계 노드 실행: ${node.data.displayName || node.id}`);
// 입력 데이터가 없으면 빈 배열 반환
if (!inputData || !Array.isArray(inputData) || inputData.length === 0) {
logger.warn("⚠️ 집계할 입력 데이터가 없습니다.");
logger.warn(
`⚠️ inputData 타입: ${typeof inputData}, 값: ${JSON.stringify(inputData)?.substring(0, 200)}`
);
return [];
}
logger.info(`📥 입력 데이터: ${inputData.length}`);
logger.info(
`📥 입력 데이터 샘플: ${JSON.stringify(inputData[0])?.substring(0, 300)}`
);
logger.info(
`📊 그룹 기준: ${groupByFields.length > 0 ? groupByFields.map((f: any) => f.field).join(", ") : "전체"}`
);
logger.info(`📊 집계 연산: ${aggregations.length}`);
// 그룹화 수행
const groups = new Map<string, any[]>();
for (const row of inputData) {
// 그룹 키 생성
const groupKey =
groupByFields.length > 0
? groupByFields
.map((f: any) => String(row[f.field] ?? ""))
.join("|||")
: "__ALL__";
if (!groups.has(groupKey)) {
groups.set(groupKey, []);
}
groups.get(groupKey)!.push(row);
}
logger.info(`📊 그룹 수: ${groups.size}`);
// 디버깅: 각 그룹의 데이터 출력
for (const [groupKey, groupRows] of groups) {
logger.info(
`📊 그룹 [${groupKey}]: ${groupRows.length}건, inbound_qty 합계: ${groupRows.reduce((sum, row) => sum + parseFloat(row.inbound_qty || 0), 0)}`
);
}
// 각 그룹에 대해 집계 수행
const results: any[] = [];
for (const [groupKey, groupRows] of groups) {
const resultRow: any = {};
// 그룹 기준 필드값 추가
if (groupByFields.length > 0) {
const keyValues = groupKey.split("|||");
groupByFields.forEach((field: any, idx: number) => {
resultRow[field.field] = keyValues[idx];
});
}
// 각 집계 연산 수행
for (const agg of aggregations) {
const { sourceField, function: aggFunc, outputField } = agg;
if (!outputField) continue;
let aggregatedValue: any;
switch (aggFunc) {
case "SUM":
aggregatedValue = groupRows.reduce((sum: number, row: any) => {
const val = parseFloat(row[sourceField]);
return sum + (isNaN(val) ? 0 : val);
}, 0);
break;
case "COUNT":
aggregatedValue = groupRows.length;
break;
case "AVG":
const sum = groupRows.reduce((acc: number, row: any) => {
const val = parseFloat(row[sourceField]);
return acc + (isNaN(val) ? 0 : val);
}, 0);
aggregatedValue = groupRows.length > 0 ? sum / groupRows.length : 0;
break;
case "MIN":
aggregatedValue = groupRows.reduce(
(min: number | null, row: any) => {
const val = parseFloat(row[sourceField]);
if (isNaN(val)) return min;
return min === null ? val : Math.min(min, val);
},
null
);
break;
case "MAX":
aggregatedValue = groupRows.reduce(
(max: number | null, row: any) => {
const val = parseFloat(row[sourceField]);
if (isNaN(val)) return max;
return max === null ? val : Math.max(max, val);
},
null
);
break;
case "FIRST":
aggregatedValue =
groupRows.length > 0 ? groupRows[0][sourceField] : null;
break;
case "LAST":
aggregatedValue =
groupRows.length > 0
? groupRows[groupRows.length - 1][sourceField]
: null;
break;
default:
logger.warn(`⚠️ 지원하지 않는 집계 함수: ${aggFunc}`);
aggregatedValue = null;
}
resultRow[outputField] = aggregatedValue;
logger.info(
` ${aggFunc}(${sourceField}) → ${outputField}: ${aggregatedValue}`
);
}
results.push(resultRow);
}
// HAVING 조건 적용 (집계 후 필터링)
let filteredResults = results;
if (havingConditions && havingConditions.length > 0) {
filteredResults = results.filter((row) => {
return havingConditions.every((condition: any) => {
const fieldValue = row[condition.field];
const compareValue = parseFloat(condition.value);
switch (condition.operator) {
case "=":
return fieldValue === compareValue;
case "!=":
return fieldValue !== compareValue;
case ">":
return fieldValue > compareValue;
case ">=":
return fieldValue >= compareValue;
case "<":
return fieldValue < compareValue;
case "<=":
return fieldValue <= compareValue;
default:
return true;
}
});
});
logger.info(
`📊 HAVING 필터링: ${results.length}건 → ${filteredResults.length}`
);
}
logger.info(`✅ 집계 완료: ${filteredResults.length}건 결과`);
// 결과 샘플 출력
if (filteredResults.length > 0) {
logger.info(`📄 결과 샘플:`, JSON.stringify(filteredResults[0], null, 2));
}
return filteredResults;
}
// ===================================================================
// 외부 연동 액션 노드들
// ===================================================================
/**
* 이메일 발송 액션 노드 실행
*/
private static async executeEmailAction(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
from,
to,
cc,
bcc,
subject,
body,
bodyType,
isHtml, // 레거시 지원
accountId: nodeAccountId, // 프론트엔드에서 선택한 계정 ID
smtpConfigId, // 레거시 지원
attachments,
templateVariables,
} = node.data;
logger.info(
`📧 이메일 발송 노드 실행: ${node.data.displayName || node.id}`
);
// 입력 데이터를 배열로 정규화
const dataArray = Array.isArray(inputData)
? inputData
: inputData
? [inputData]
: [{}];
const results: any[] = [];
// 동적 임포트로 순환 참조 방지
const { mailSendSimpleService } = await import("./mailSendSimpleService");
const { mailAccountFileService } = await import("./mailAccountFileService");
// 계정 ID 우선순위: nodeAccountId > smtpConfigId > 첫 번째 활성 계정
let accountId = nodeAccountId || smtpConfigId;
if (!accountId) {
const accounts = await mailAccountFileService.getAllAccounts();
const activeAccount = accounts.find(
(acc: any) => acc.status === "active"
);
if (activeAccount) {
accountId = activeAccount.id;
logger.info(
`📧 자동 선택된 메일 계정: ${activeAccount.name} (${activeAccount.email})`
);
} else {
throw new Error(
"활성화된 메일 계정이 없습니다. 메일 계정을 먼저 설정해주세요."
);
}
}
// HTML 여부 판단 (bodyType 우선, isHtml 레거시 지원)
const useHtml = bodyType === "html" || isHtml === true;
for (const data of dataArray) {
try {
// 템플릿 변수 치환
const processedSubject = this.replaceTemplateVariables(
subject || "",
data
);
const processedBody = this.replaceTemplateVariables(body || "", data);
const processedTo = this.replaceTemplateVariables(to || "", data);
const processedCc = cc
? this.replaceTemplateVariables(cc, data)
: undefined;
const processedBcc = bcc
? this.replaceTemplateVariables(bcc, data)
: undefined;
// 수신자 파싱 (쉼표로 구분)
const toList = processedTo
.split(",")
.map((email: string) => email.trim())
.filter((email: string) => email);
const ccList = processedCc
? processedCc
.split(",")
.map((email: string) => email.trim())
.filter((email: string) => email)
: undefined;
const bccList = processedBcc
? processedBcc
.split(",")
.map((email: string) => email.trim())
.filter((email: string) => email)
: undefined;
if (toList.length === 0) {
throw new Error("수신자 이메일 주소가 지정되지 않았습니다.");
}
// 메일 발송 요청
const sendResult = await mailSendSimpleService.sendMail({
accountId,
to: toList,
cc: ccList,
bcc: bccList,
subject: processedSubject,
customHtml: useHtml ? processedBody : `<pre>${processedBody}</pre>`,
attachments: attachments?.map((att: any) => ({
filename: att.type === "dataField" ? data[att.value] : att.value,
path: att.type === "dataField" ? data[att.value] : att.value,
})),
});
if (sendResult.success) {
logger.info(`✅ 이메일 발송 성공: ${toList.join(", ")}`);
results.push({
success: true,
to: toList,
messageId: sendResult.messageId,
});
} else {
logger.error(`❌ 이메일 발송 실패: ${sendResult.error}`);
results.push({
success: false,
to: toList,
error: sendResult.error,
});
}
} catch (error: any) {
logger.error(`❌ 이메일 발송 오류:`, error);
results.push({
success: false,
error: error.message,
});
}
}
const successCount = results.filter((r) => r.success).length;
const failedCount = results.filter((r) => !r.success).length;
logger.info(
`📧 이메일 발송 완료: 성공 ${successCount}건, 실패 ${failedCount}`
);
return {
action: "emailAction",
totalCount: results.length,
successCount,
failedCount,
results,
};
}
/**
* 스크립트 실행 액션 노드 실행
*/
private static async executeScriptAction(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
scriptType,
scriptPath,
arguments: scriptArgs,
workingDirectory,
environmentVariables,
timeout,
captureOutput,
} = node.data;
logger.info(
`🖥️ 스크립트 실행 노드 실행: ${node.data.displayName || node.id}`
);
logger.info(` 스크립트 타입: ${scriptType}, 경로: ${scriptPath}`);
if (!scriptPath) {
throw new Error("스크립트 경로가 지정되지 않았습니다.");
}
// 입력 데이터를 배열로 정규화
const dataArray = Array.isArray(inputData)
? inputData
: inputData
? [inputData]
: [{}];
const results: any[] = [];
// child_process 모듈 동적 임포트
const { spawn } = await import("child_process");
const path = await import("path");
for (const data of dataArray) {
try {
// 인자 처리
const processedArgs: string[] = [];
if (scriptArgs && Array.isArray(scriptArgs)) {
for (const arg of scriptArgs) {
if (arg.type === "dataField") {
// 데이터 필드 참조
const value = this.replaceTemplateVariables(arg.value, data);
processedArgs.push(value);
} else {
processedArgs.push(arg.value);
}
}
}
// 환경 변수 처리
const env = {
...process.env,
...(environmentVariables || {}),
};
// 스크립트 타입에 따른 명령어 결정
let command: string;
let args: string[];
switch (scriptType) {
case "python":
command = "python3";
args = [scriptPath, ...processedArgs];
break;
case "shell":
command = "bash";
args = [scriptPath, ...processedArgs];
break;
case "executable":
command = scriptPath;
args = processedArgs;
break;
default:
throw new Error(`지원하지 않는 스크립트 타입: ${scriptType}`);
}
logger.info(` 실행 명령: ${command} ${args.join(" ")}`);
// 스크립트 실행 (Promise로 래핑)
const result = await new Promise<{
exitCode: number | null;
stdout: string;
stderr: string;
}>((resolve, reject) => {
const childProcess = spawn(command, args, {
cwd: workingDirectory || process.cwd(),
env,
timeout: timeout || 60000, // 기본 60초
});
let stdout = "";
let stderr = "";
if (captureOutput !== false) {
childProcess.stdout?.on("data", (data) => {
stdout += data.toString();
});
childProcess.stderr?.on("data", (data) => {
stderr += data.toString();
});
}
childProcess.on("close", (code) => {
resolve({ exitCode: code, stdout, stderr });
});
childProcess.on("error", (error) => {
reject(error);
});
});
if (result.exitCode === 0) {
logger.info(`✅ 스크립트 실행 성공 (종료 코드: ${result.exitCode})`);
results.push({
success: true,
exitCode: result.exitCode,
stdout: result.stdout,
stderr: result.stderr,
data,
});
} else {
logger.warn(`⚠️ 스크립트 실행 완료 (종료 코드: ${result.exitCode})`);
results.push({
success: false,
exitCode: result.exitCode,
stdout: result.stdout,
stderr: result.stderr,
data,
});
}
} catch (error: any) {
logger.error(`❌ 스크립트 실행 오류:`, error);
results.push({
success: false,
error: error.message,
data,
});
}
}
const successCount = results.filter((r) => r.success).length;
const failedCount = results.filter((r) => !r.success).length;
logger.info(
`🖥️ 스크립트 실행 완료: 성공 ${successCount}건, 실패 ${failedCount}`
);
return {
action: "scriptAction",
scriptType,
scriptPath,
totalCount: results.length,
successCount,
failedCount,
results,
};
}
/**
* HTTP 요청 액션 노드 실행
*/
private static async executeHttpRequestAction(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any> {
const {
url,
method,
headers,
bodyTemplate,
bodyType,
authentication,
timeout,
retryCount,
responseMapping,
} = node.data;
logger.info(`🌐 HTTP 요청 노드 실행: ${node.data.displayName || node.id}`);
logger.info(` 메서드: ${method}, URL: ${url}`);
if (!url) {
throw new Error("HTTP 요청 URL이 지정되지 않았습니다.");
}
// 입력 데이터를 배열로 정규화
const dataArray = Array.isArray(inputData)
? inputData
: inputData
? [inputData]
: [{}];
const results: any[] = [];
for (const data of dataArray) {
let currentRetry = 0;
const maxRetries = retryCount || 0;
while (currentRetry <= maxRetries) {
try {
// URL 템플릿 변수 치환
const processedUrl = this.replaceTemplateVariables(url, data);
// 헤더 처리
const processedHeaders: Record<string, string> = {};
if (headers && Array.isArray(headers)) {
for (const header of headers) {
const headerValue =
header.valueType === "dataField"
? this.replaceTemplateVariables(header.value, data)
: header.value;
processedHeaders[header.name] = headerValue;
}
}
// 인증 헤더 추가
if (authentication) {
switch (authentication.type) {
case "basic":
if (authentication.username && authentication.password) {
const credentials = Buffer.from(
`${authentication.username}:${authentication.password}`
).toString("base64");
processedHeaders["Authorization"] = `Basic ${credentials}`;
}
break;
case "bearer":
if (authentication.token) {
processedHeaders["Authorization"] =
`Bearer ${authentication.token}`;
}
break;
case "apikey":
if (authentication.apiKey) {
if (authentication.apiKeyLocation === "query") {
// 쿼리 파라미터로 추가 (URL에 추가)
const paramName =
authentication.apiKeyQueryParam || "api_key";
const separator = processedUrl.includes("?") ? "&" : "?";
// URL은 이미 처리되었으므로 여기서는 결과에 포함
} else {
// 헤더로 추가
const headerName =
authentication.apiKeyHeader || "X-API-Key";
processedHeaders[headerName] = authentication.apiKey;
}
}
break;
}
}
// Content-Type 기본값
if (
!processedHeaders["Content-Type"] &&
["POST", "PUT", "PATCH"].includes(method)
) {
processedHeaders["Content-Type"] =
bodyType === "json" ? "application/json" : "text/plain";
}
// 바디 처리
let processedBody: string | undefined;
if (["POST", "PUT", "PATCH"].includes(method) && bodyTemplate) {
processedBody = this.replaceTemplateVariables(bodyTemplate, data);
}
logger.info(` 요청 URL: ${processedUrl}`);
logger.info(` 요청 헤더: ${JSON.stringify(processedHeaders)}`);
if (processedBody) {
logger.info(` 요청 바디: ${processedBody.substring(0, 200)}...`);
}
// HTTP 요청 실행
const response = await axios({
method: method.toLowerCase() as any,
url: processedUrl,
headers: processedHeaders,
data: processedBody,
timeout: timeout || 30000,
validateStatus: () => true, // 모든 상태 코드 허용
});
logger.info(
` 응답 상태: ${response.status} ${response.statusText}`
);
// 응답 데이터 처리
let responseData = response.data;
// 응답 매핑 적용
if (responseMapping && responseData) {
const paths = responseMapping.split(".");
for (const path of paths) {
if (
responseData &&
typeof responseData === "object" &&
path in responseData
) {
responseData = responseData[path];
} else {
logger.warn(
`⚠️ 응답 매핑 경로를 찾을 수 없습니다: ${responseMapping}`
);
break;
}
}
}
const isSuccess = response.status >= 200 && response.status < 300;
if (isSuccess) {
logger.info(`✅ HTTP 요청 성공`);
results.push({
success: true,
statusCode: response.status,
data: responseData,
inputData: data,
});
break; // 성공 시 재시도 루프 종료
} else {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
} catch (error: any) {
currentRetry++;
if (currentRetry > maxRetries) {
logger.error(
`❌ HTTP 요청 실패 (재시도 ${currentRetry - 1}/${maxRetries}):`,
error.message
);
results.push({
success: false,
error: error.message,
inputData: data,
});
} else {
logger.warn(
`⚠️ HTTP 요청 재시도 (${currentRetry}/${maxRetries}): ${error.message}`
);
// 재시도 전 잠시 대기
await new Promise((resolve) =>
setTimeout(resolve, 1000 * currentRetry)
);
}
}
}
}
const successCount = results.filter((r) => r.success).length;
const failedCount = results.filter((r) => !r.success).length;
logger.info(
`🌐 HTTP 요청 완료: 성공 ${successCount}건, 실패 ${failedCount}`
);
return {
action: "httpRequestAction",
method,
url,
totalCount: results.length,
successCount,
failedCount,
results,
};
}
/**
* 수식 변환 노드 실행
* - 타겟 테이블에서 기존 값 조회 (targetLookup)
* - 산술 연산, 함수, 조건, 정적 값 계산
*/
private static async executeFormulaTransform(
node: FlowNode,
inputData: any,
context: ExecutionContext
): Promise<any[]> {
const { targetLookup, transformations = [] } = node.data;
logger.info(`🧮 수식 변환 노드 실행: ${node.data.displayName || node.id}`);
logger.info(` 변환 규칙: ${transformations.length}`);
// 입력 데이터를 배열로 정규화
const dataArray = Array.isArray(inputData)
? inputData
: inputData
? [inputData]
: [];
if (dataArray.length === 0) {
logger.warn(`⚠️ 수식 변환 노드: 입력 데이터가 없습니다`);
return [];
}
const results: any[] = [];
for (const sourceRow of dataArray) {
let targetRow: any = null;
// 타겟 테이블에서 기존 값 조회
if (targetLookup?.tableName && targetLookup?.lookupKeys?.length > 0) {
try {
const whereConditions = targetLookup.lookupKeys
.map((key: any, idx: number) => `${key.targetField} = $${idx + 1}`)
.join(" AND ");
const lookupValues = targetLookup.lookupKeys.map(
(key: any) => sourceRow[key.sourceField]
);
// company_code 필터링 추가
const companyCode =
context.buttonContext?.companyCode || sourceRow.company_code;
let sql = `SELECT * FROM ${targetLookup.tableName} WHERE ${whereConditions}`;
const params = [...lookupValues];
if (companyCode && companyCode !== "*") {
sql += ` AND company_code = $${params.length + 1}`;
params.push(companyCode);
}
sql += " LIMIT 1";
logger.info(` 타겟 조회: ${targetLookup.tableName}`);
logger.info(` 조회 조건: ${whereConditions}`);
logger.info(` 조회 값: ${JSON.stringify(lookupValues)}`);
targetRow = await queryOne(sql, params);
if (targetRow) {
logger.info(` ✅ 타겟 데이터 조회 성공`);
} else {
logger.info(` 타겟 데이터 없음 (신규)`);
}
} catch (error: any) {
logger.warn(` ⚠️ 타겟 조회 실패: ${error.message}`);
}
}
// 결과 객체 (소스 데이터 복사)
const resultRow = { ...sourceRow };
// 중간 결과 저장소 (이전 변환 결과 참조용)
const resultValues: Record<string, any> = {};
// 변환 규칙 순차 실행
for (const trans of transformations) {
try {
const value = this.evaluateFormula(
trans,
sourceRow,
targetRow,
resultValues
);
resultRow[trans.outputField] = value;
resultValues[trans.outputField] = value;
logger.info(
` ${trans.outputField} = ${JSON.stringify(value)} (${trans.formulaType})`
);
} catch (error: any) {
logger.error(
` ❌ 수식 계산 실패 [${trans.outputField}]: ${error.message}`
);
resultRow[trans.outputField] = null;
}
}
results.push(resultRow);
}
logger.info(`✅ 수식 변환 완료: ${results.length}`);
return results;
}
/**
* 수식 계산
*/
private static evaluateFormula(
trans: any,
sourceRow: any,
targetRow: any,
resultValues: Record<string, any>
): any {
const {
formulaType,
arithmetic,
function: func,
condition,
staticValue,
} = trans;
switch (formulaType) {
case "arithmetic":
return this.evaluateArithmetic(
arithmetic,
sourceRow,
targetRow,
resultValues
);
case "function":
return this.evaluateFunction(func, sourceRow, targetRow, resultValues);
case "condition":
return this.evaluateCaseCondition(
condition,
sourceRow,
targetRow,
resultValues
);
case "static":
return this.parseStaticValue(staticValue);
default:
throw new Error(`지원하지 않는 수식 타입: ${formulaType}`);
}
}
/**
* 피연산자 값 가져오기
*/
private static getOperandValue(
operand: any,
sourceRow: any,
targetRow: any,
resultValues: Record<string, any>
): any {
if (!operand) return null;
switch (operand.type) {
case "source":
return sourceRow?.[operand.field] ?? null;
case "target":
return targetRow?.[operand.field] ?? null;
case "static":
return this.parseStaticValue(operand.value);
case "result":
return resultValues?.[operand.resultField] ?? null;
default:
return null;
}
}
/**
* 정적 값 파싱 (숫자, 불린, 문자열)
*/
private static parseStaticValue(value: any): any {
if (value === null || value === undefined || value === "") return null;
// 숫자 체크
const numValue = Number(value);
if (!isNaN(numValue) && value !== "") return numValue;
// 불린 체크
if (value === "true") return true;
if (value === "false") return false;
// 문자열 반환
return value;
}
/**
* 산술 연산 계산
*/
private static evaluateArithmetic(
arithmetic: any,
sourceRow: any,
targetRow: any,
resultValues: Record<string, any>
): number | null {
if (!arithmetic) return null;
const left = this.getOperandValue(
arithmetic.leftOperand,
sourceRow,
targetRow,
resultValues
);
const right = this.getOperandValue(
arithmetic.rightOperand,
sourceRow,
targetRow,
resultValues
);
// COALESCE 처리: null이면 0으로 대체
const leftNum = Number(left) || 0;
const rightNum = Number(right) || 0;
switch (arithmetic.operator) {
case "+":
return leftNum + rightNum;
case "-":
return leftNum - rightNum;
case "*":
return leftNum * rightNum;
case "/":
if (rightNum === 0) {
logger.warn(`⚠️ 0으로 나누기 시도`);
return null;
}
return leftNum / rightNum;
case "%":
if (rightNum === 0) {
logger.warn(`⚠️ 0으로 나머지 연산 시도`);
return null;
}
return leftNum % rightNum;
default:
throw new Error(`지원하지 않는 연산자: ${arithmetic.operator}`);
}
}
/**
* 함수 실행
*/
private static evaluateFunction(
func: any,
sourceRow: any,
targetRow: any,
resultValues: Record<string, any>
): any {
if (!func) return null;
const args = (func.arguments || []).map((arg: any) =>
this.getOperandValue(arg, sourceRow, targetRow, resultValues)
);
switch (func.name) {
case "NOW":
return new Date().toISOString();
case "COALESCE":
// 첫 번째 non-null 값 반환
for (const arg of args) {
if (arg !== null && arg !== undefined) return arg;
}
return null;
case "CONCAT":
return args.filter((a: any) => a !== null && a !== undefined).join("");
case "UPPER":
return args[0] ? String(args[0]).toUpperCase() : null;
case "LOWER":
return args[0] ? String(args[0]).toLowerCase() : null;
case "TRIM":
return args[0] ? String(args[0]).trim() : null;
case "ROUND":
return args[0] !== null ? Math.round(Number(args[0])) : null;
case "ABS":
return args[0] !== null ? Math.abs(Number(args[0])) : null;
case "SUBSTRING":
if (args[0] && args[1] !== undefined) {
const str = String(args[0]);
const start = Number(args[1]) || 0;
const length = args[2] !== undefined ? Number(args[2]) : undefined;
return length !== undefined
? str.substring(start, start + length)
: str.substring(start);
}
return null;
default:
throw new Error(`지원하지 않는 함수: ${func.name}`);
}
}
/**
* 조건 평가 (CASE WHEN ... THEN ... ELSE)
*/
private static evaluateCaseCondition(
condition: any,
sourceRow: any,
targetRow: any,
resultValues: Record<string, any>
): any {
if (!condition) return null;
const { when, then: thenValue, else: elseValue } = condition;
// WHEN 조건 평가
const leftValue = this.getOperandValue(
when.leftOperand,
sourceRow,
targetRow,
resultValues
);
const rightValue = when.rightOperand
? this.getOperandValue(
when.rightOperand,
sourceRow,
targetRow,
resultValues
)
: null;
let conditionResult = false;
switch (when.operator) {
case "=":
conditionResult = leftValue == rightValue;
break;
case "!=":
conditionResult = leftValue != rightValue;
break;
case ">":
conditionResult = Number(leftValue) > Number(rightValue);
break;
case "<":
conditionResult = Number(leftValue) < Number(rightValue);
break;
case ">=":
conditionResult = Number(leftValue) >= Number(rightValue);
break;
case "<=":
conditionResult = Number(leftValue) <= Number(rightValue);
break;
case "IS_NULL":
conditionResult = leftValue === null || leftValue === undefined;
break;
case "IS_NOT_NULL":
conditionResult = leftValue !== null && leftValue !== undefined;
break;
default:
throw new Error(`지원하지 않는 조건 연산자: ${when.operator}`);
}
// THEN 또는 ELSE 값 반환
if (conditionResult) {
return this.getOperandValue(
thenValue,
sourceRow,
targetRow,
resultValues
);
} else {
return this.getOperandValue(
elseValue,
sourceRow,
targetRow,
resultValues
);
}
}
}