1074 lines
29 KiB
TypeScript
1074 lines
29 KiB
TypeScript
|
|
/**
|
|||
|
|
* 노드 플로우 실행 엔진
|
|||
|
|
*
|
|||
|
|
* 기능:
|
|||
|
|
* - 위상 정렬 (Topological Sort)
|
|||
|
|
* - 레벨별 병렬 실행 (Promise.allSettled)
|
|||
|
|
* - 독립 트랜잭션 처리
|
|||
|
|
* - 연쇄 중단 (부모 실패 시 자식 스킵)
|
|||
|
|
*/
|
|||
|
|
|
|||
|
|
import { query, queryOne, transaction } from "../database/db";
|
|||
|
|
import { logger } from "../utils/logger";
|
|||
|
|
|
|||
|
|
// ===== 타입 정의 =====
|
|||
|
|
|
|||
|
|
export interface FlowNode {
|
|||
|
|
id: string;
|
|||
|
|
type: NodeType;
|
|||
|
|
position?: { x: number; y: number };
|
|||
|
|
data: NodeData;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export type NodeType =
|
|||
|
|
| "tableSource"
|
|||
|
|
| "externalDBSource"
|
|||
|
|
| "restAPISource"
|
|||
|
|
| "condition"
|
|||
|
|
| "fieldMapping"
|
|||
|
|
| "dataTransform"
|
|||
|
|
| "insertAction"
|
|||
|
|
| "updateAction"
|
|||
|
|
| "deleteAction"
|
|||
|
|
| "upsertAction"
|
|||
|
|
| "comment"
|
|||
|
|
| "log";
|
|||
|
|
|
|||
|
|
export interface NodeData {
|
|||
|
|
displayName?: string;
|
|||
|
|
[key: string]: any;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export interface FlowEdge {
|
|||
|
|
id: string;
|
|||
|
|
source: string;
|
|||
|
|
target: string;
|
|||
|
|
sourceHandle?: string;
|
|||
|
|
targetHandle?: string;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export interface ExecutionContext {
|
|||
|
|
sourceData?: any[]; // 외부에서 주입된 데이터 (선택된 행 또는 폼 데이터)
|
|||
|
|
dataSourceType?: string; // "table-selection" | "form" | "none"
|
|||
|
|
nodeResults: Map<string, NodeResult>;
|
|||
|
|
executionOrder: string[];
|
|||
|
|
buttonContext?: ButtonContext;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export interface ButtonContext {
|
|||
|
|
buttonId: string;
|
|||
|
|
screenId?: number;
|
|||
|
|
companyCode?: string;
|
|||
|
|
userId?: string;
|
|||
|
|
formData?: Record<string, any>;
|
|||
|
|
selectedRowsData?: Record<string, any>[];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export interface NodeResult {
|
|||
|
|
nodeId: string;
|
|||
|
|
status: "pending" | "success" | "failed" | "skipped";
|
|||
|
|
data?: any;
|
|||
|
|
error?: Error;
|
|||
|
|
startTime: number;
|
|||
|
|
endTime?: number;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export interface ExecutionResult {
|
|||
|
|
success: boolean;
|
|||
|
|
message: string;
|
|||
|
|
executionTime: number;
|
|||
|
|
nodes: NodeExecutionSummary[];
|
|||
|
|
summary: {
|
|||
|
|
total: number;
|
|||
|
|
success: number;
|
|||
|
|
failed: number;
|
|||
|
|
skipped: number;
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export interface NodeExecutionSummary {
|
|||
|
|
nodeId: string;
|
|||
|
|
nodeName: string;
|
|||
|
|
nodeType: NodeType;
|
|||
|
|
status: "success" | "failed" | "skipped" | "pending";
|
|||
|
|
duration?: number;
|
|||
|
|
error?: string;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ===== 메인 실행 서비스 =====
|
|||
|
|
|
|||
|
|
export class NodeFlowExecutionService {
|
|||
|
|
/**
|
|||
|
|
* 플로우 실행 메인 함수
|
|||
|
|
*/
|
|||
|
|
static async executeFlow(
|
|||
|
|
flowId: number,
|
|||
|
|
contextData: Record<string, any>
|
|||
|
|
): Promise<ExecutionResult> {
|
|||
|
|
const startTime = Date.now();
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
logger.info(`🚀 플로우 실행 시작: flowId=${flowId}`);
|
|||
|
|
|
|||
|
|
// 1. 플로우 데이터 조회
|
|||
|
|
const flow = await queryOne<{
|
|||
|
|
flow_id: number;
|
|||
|
|
flow_name: string;
|
|||
|
|
flow_data: any;
|
|||
|
|
}>(
|
|||
|
|
`SELECT flow_id, flow_name, flow_data FROM node_flows WHERE flow_id = $1`,
|
|||
|
|
[flowId]
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
if (!flow) {
|
|||
|
|
throw new Error(`플로우를 찾을 수 없습니다: flowId=${flowId}`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const flowData =
|
|||
|
|
typeof flow.flow_data === "string"
|
|||
|
|
? JSON.parse(flow.flow_data)
|
|||
|
|
: flow.flow_data;
|
|||
|
|
|
|||
|
|
const { nodes, edges } = flowData;
|
|||
|
|
|
|||
|
|
logger.info(`📊 플로우 정보:`, {
|
|||
|
|
flowName: flow.flow_name,
|
|||
|
|
nodeCount: nodes.length,
|
|||
|
|
edgeCount: edges.length,
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 2. 실행 컨텍스트 준비
|
|||
|
|
const context: ExecutionContext = {
|
|||
|
|
sourceData: contextData.sourceData || [],
|
|||
|
|
dataSourceType: contextData.dataSourceType || "none",
|
|||
|
|
nodeResults: new Map(),
|
|||
|
|
executionOrder: [],
|
|||
|
|
buttonContext: {
|
|||
|
|
buttonId:
|
|||
|
|
contextData.buttonId || contextData.context?.buttonId || "unknown",
|
|||
|
|
screenId: contextData.screenId || contextData.context?.screenId,
|
|||
|
|
companyCode:
|
|||
|
|
contextData.companyCode || contextData.context?.companyCode,
|
|||
|
|
userId: contextData.userId || contextData.context?.userId,
|
|||
|
|
formData: contextData.formData || contextData.context?.formData,
|
|||
|
|
selectedRowsData:
|
|||
|
|
contextData.selectedRowsData ||
|
|||
|
|
contextData.context?.selectedRowsData,
|
|||
|
|
},
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
logger.info(`📦 실행 컨텍스트:`, {
|
|||
|
|
dataSourceType: context.dataSourceType,
|
|||
|
|
sourceDataCount: context.sourceData?.length || 0,
|
|||
|
|
buttonContext: context.buttonContext,
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 3. 위상 정렬
|
|||
|
|
const levels = this.topologicalSort(nodes, edges);
|
|||
|
|
logger.info(`📋 실행 순서 (레벨별):`, levels);
|
|||
|
|
|
|||
|
|
// 4. 레벨별 실행
|
|||
|
|
for (const level of levels) {
|
|||
|
|
await this.executeLevel(level, nodes, edges, context);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 5. 결과 생성
|
|||
|
|
const executionTime = Date.now() - startTime;
|
|||
|
|
const result = this.generateExecutionResult(
|
|||
|
|
nodes,
|
|||
|
|
context,
|
|||
|
|
executionTime
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
logger.info(`✅ 플로우 실행 완료:`, result.summary);
|
|||
|
|
|
|||
|
|
return result;
|
|||
|
|
} catch (error) {
|
|||
|
|
logger.error(`❌ 플로우 실행 실패:`, error);
|
|||
|
|
throw error;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 소스 데이터 준비
|
|||
|
|
*/
|
|||
|
|
private static prepareSourceData(contextData: Record<string, any>): any[] {
|
|||
|
|
const { controlDataSource, formData, selectedRowsData } = contextData;
|
|||
|
|
|
|||
|
|
switch (controlDataSource) {
|
|||
|
|
case "form":
|
|||
|
|
return formData ? [formData] : [];
|
|||
|
|
|
|||
|
|
case "table-selection":
|
|||
|
|
return selectedRowsData || [];
|
|||
|
|
|
|||
|
|
case "both":
|
|||
|
|
return [
|
|||
|
|
{ source: "form", data: formData },
|
|||
|
|
{ source: "table", data: selectedRowsData },
|
|||
|
|
];
|
|||
|
|
|
|||
|
|
default:
|
|||
|
|
return formData ? [formData] : [];
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 위상 정렬 (Topological Sort)
|
|||
|
|
* DAG(Directed Acyclic Graph)를 레벨별로 그룹화
|
|||
|
|
*/
|
|||
|
|
private static topologicalSort(
|
|||
|
|
nodes: FlowNode[],
|
|||
|
|
edges: FlowEdge[]
|
|||
|
|
): string[][] {
|
|||
|
|
const levels: string[][] = [];
|
|||
|
|
const inDegree = new Map<string, number>();
|
|||
|
|
const adjacency = new Map<string, string[]>();
|
|||
|
|
|
|||
|
|
// 초기화
|
|||
|
|
nodes.forEach((node) => {
|
|||
|
|
inDegree.set(node.id, 0);
|
|||
|
|
adjacency.set(node.id, []);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 진입 차수 계산
|
|||
|
|
edges.forEach((edge) => {
|
|||
|
|
inDegree.set(edge.target, (inDegree.get(edge.target) || 0) + 1);
|
|||
|
|
adjacency.get(edge.source)?.push(edge.target);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 레벨별 분류
|
|||
|
|
let currentLevel = nodes
|
|||
|
|
.filter((node) => inDegree.get(node.id) === 0)
|
|||
|
|
.map((node) => node.id);
|
|||
|
|
|
|||
|
|
while (currentLevel.length > 0) {
|
|||
|
|
levels.push([...currentLevel]);
|
|||
|
|
|
|||
|
|
const nextLevel: string[] = [];
|
|||
|
|
currentLevel.forEach((nodeId) => {
|
|||
|
|
const neighbors = adjacency.get(nodeId) || [];
|
|||
|
|
neighbors.forEach((neighbor) => {
|
|||
|
|
const newDegree = (inDegree.get(neighbor) || 1) - 1;
|
|||
|
|
inDegree.set(neighbor, newDegree);
|
|||
|
|
if (newDegree === 0) {
|
|||
|
|
nextLevel.push(neighbor);
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
currentLevel = nextLevel;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return levels;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 레벨 내 노드 병렬 실행
|
|||
|
|
*/
|
|||
|
|
private static async executeLevel(
|
|||
|
|
nodeIds: string[],
|
|||
|
|
nodes: FlowNode[],
|
|||
|
|
edges: FlowEdge[],
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<void> {
|
|||
|
|
logger.info(`⏳ 레벨 실행 시작: ${nodeIds.length}개 노드`);
|
|||
|
|
|
|||
|
|
// Promise.allSettled로 병렬 실행
|
|||
|
|
const results = await Promise.allSettled(
|
|||
|
|
nodeIds.map((nodeId) => this.executeNode(nodeId, nodes, edges, context))
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
// 결과 저장
|
|||
|
|
results.forEach((result, index) => {
|
|||
|
|
const nodeId = nodeIds[index];
|
|||
|
|
if (result.status === "fulfilled") {
|
|||
|
|
context.nodeResults.set(nodeId, result.value);
|
|||
|
|
context.executionOrder.push(nodeId);
|
|||
|
|
} else {
|
|||
|
|
context.nodeResults.set(nodeId, {
|
|||
|
|
nodeId,
|
|||
|
|
status: "failed",
|
|||
|
|
error: result.reason,
|
|||
|
|
startTime: Date.now(),
|
|||
|
|
endTime: Date.now(),
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
logger.info(`✅ 레벨 실행 완료`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 개별 노드 실행
|
|||
|
|
*/
|
|||
|
|
private static async executeNode(
|
|||
|
|
nodeId: string,
|
|||
|
|
nodes: FlowNode[],
|
|||
|
|
edges: FlowEdge[],
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<NodeResult> {
|
|||
|
|
const startTime = Date.now();
|
|||
|
|
const node = nodes.find((n) => n.id === nodeId);
|
|||
|
|
|
|||
|
|
if (!node) {
|
|||
|
|
throw new Error(`노드를 찾을 수 없습니다: ${nodeId}`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(`🔄 노드 실행 시작: ${nodeId} (${node.type})`);
|
|||
|
|
|
|||
|
|
// 1. 부모 노드 상태 확인 (연쇄 중단)
|
|||
|
|
const parents = this.getParentNodes(nodeId, edges);
|
|||
|
|
const parentFailed = parents.some((parentId) => {
|
|||
|
|
const parentResult = context.nodeResults.get(parentId);
|
|||
|
|
return parentResult?.status === "failed";
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
if (parentFailed) {
|
|||
|
|
logger.warn(`⏭️ 노드 스킵 (부모 실패): ${nodeId}`);
|
|||
|
|
return {
|
|||
|
|
nodeId,
|
|||
|
|
status: "skipped",
|
|||
|
|
error: new Error("Parent node failed"),
|
|||
|
|
startTime,
|
|||
|
|
endTime: Date.now(),
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 2. 입력 데이터 준비
|
|||
|
|
const inputData = this.prepareInputData(nodeId, parents, edges, context);
|
|||
|
|
|
|||
|
|
// 3. 노드 타입별 실행
|
|||
|
|
try {
|
|||
|
|
const result = await this.executeNodeByType(node, inputData, context);
|
|||
|
|
|
|||
|
|
logger.info(`✅ 노드 실행 성공: ${nodeId}`);
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
nodeId,
|
|||
|
|
status: "success",
|
|||
|
|
data: result,
|
|||
|
|
startTime,
|
|||
|
|
endTime: Date.now(),
|
|||
|
|
};
|
|||
|
|
} catch (error) {
|
|||
|
|
logger.error(`❌ 노드 실행 실패: ${nodeId}`, error);
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
nodeId,
|
|||
|
|
status: "failed",
|
|||
|
|
error: error as Error,
|
|||
|
|
startTime,
|
|||
|
|
endTime: Date.now(),
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 부모 노드 목록 조회
|
|||
|
|
*/
|
|||
|
|
private static getParentNodes(nodeId: string, edges: FlowEdge[]): string[] {
|
|||
|
|
return edges
|
|||
|
|
.filter((edge) => edge.target === nodeId)
|
|||
|
|
.map((edge) => edge.source);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 입력 데이터 준비
|
|||
|
|
*/
|
|||
|
|
private static prepareInputData(
|
|||
|
|
nodeId: string,
|
|||
|
|
parents: string[],
|
|||
|
|
edges: FlowEdge[],
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): any {
|
|||
|
|
if (parents.length === 0) {
|
|||
|
|
// 소스 노드: 원본 데이터 사용
|
|||
|
|
return context.sourceData;
|
|||
|
|
} else if (parents.length === 1) {
|
|||
|
|
// 단일 부모: 부모의 결과 데이터 전달
|
|||
|
|
const parentResult = context.nodeResults.get(parents[0]);
|
|||
|
|
return parentResult?.data || context.sourceData;
|
|||
|
|
} else {
|
|||
|
|
// 다중 부모: 모든 부모의 데이터 병합
|
|||
|
|
return parents.map((parentId) => {
|
|||
|
|
const result = context.nodeResults.get(parentId);
|
|||
|
|
return result?.data || context.sourceData;
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 노드 타입별 실행 로직
|
|||
|
|
*/
|
|||
|
|
private static async executeNodeByType(
|
|||
|
|
node: FlowNode,
|
|||
|
|
inputData: any,
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<any> {
|
|||
|
|
switch (node.type) {
|
|||
|
|
case "tableSource":
|
|||
|
|
return this.executeTableSource(node, context);
|
|||
|
|
|
|||
|
|
case "dataTransform":
|
|||
|
|
return this.executeDataTransform(node, inputData, context);
|
|||
|
|
|
|||
|
|
case "insertAction":
|
|||
|
|
return this.executeInsertAction(node, inputData, context);
|
|||
|
|
|
|||
|
|
case "updateAction":
|
|||
|
|
return this.executeUpdateAction(node, inputData, context);
|
|||
|
|
|
|||
|
|
case "deleteAction":
|
|||
|
|
return this.executeDeleteAction(node, inputData, context);
|
|||
|
|
|
|||
|
|
case "upsertAction":
|
|||
|
|
return this.executeUpsertAction(node, inputData, context);
|
|||
|
|
|
|||
|
|
case "condition":
|
|||
|
|
return this.executeCondition(node, inputData, context);
|
|||
|
|
|
|||
|
|
case "comment":
|
|||
|
|
case "log":
|
|||
|
|
// 로그/코멘트는 실행 없이 통과
|
|||
|
|
logger.info(`📝 ${node.type}: ${node.data.displayName || node.id}`);
|
|||
|
|
return { message: "Logged" };
|
|||
|
|
|
|||
|
|
default:
|
|||
|
|
logger.warn(`⚠️ 지원하지 않는 노드 타입: ${node.type}`);
|
|||
|
|
return { message: "Unsupported node type" };
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 테이블 소스 노드 실행
|
|||
|
|
*/
|
|||
|
|
private static async executeTableSource(
|
|||
|
|
node: FlowNode,
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<any[]> {
|
|||
|
|
// 🔥 외부에서 주입된 데이터가 있으면 우선 사용
|
|||
|
|
if (
|
|||
|
|
context.sourceData &&
|
|||
|
|
Array.isArray(context.sourceData) &&
|
|||
|
|
context.sourceData.length > 0
|
|||
|
|
) {
|
|||
|
|
logger.info(
|
|||
|
|
`📊 외부 주입 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}건`
|
|||
|
|
);
|
|||
|
|
return context.sourceData;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 외부 데이터가 없으면 DB 쿼리 실행
|
|||
|
|
const { tableName, schema, whereConditions } = node.data;
|
|||
|
|
|
|||
|
|
if (!tableName) {
|
|||
|
|
logger.warn(
|
|||
|
|
"⚠️ 테이블 소스 노드에 테이블명이 없고, 외부 데이터도 없습니다."
|
|||
|
|
);
|
|||
|
|
return [];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const schemaPrefix = schema ? `${schema}.` : "";
|
|||
|
|
const whereClause = whereConditions
|
|||
|
|
? `WHERE ${this.buildWhereClause(whereConditions)}`
|
|||
|
|
: "";
|
|||
|
|
|
|||
|
|
const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereClause}`;
|
|||
|
|
|
|||
|
|
const result = await query(sql, []);
|
|||
|
|
|
|||
|
|
logger.info(`📊 테이블 소스 조회: ${tableName}, ${result.length}건`);
|
|||
|
|
|
|||
|
|
return result;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* INSERT 액션 노드 실행
|
|||
|
|
*/
|
|||
|
|
private static async executeInsertAction(
|
|||
|
|
node: FlowNode,
|
|||
|
|
inputData: any,
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<any> {
|
|||
|
|
const { targetTable, fieldMappings } = node.data;
|
|||
|
|
|
|||
|
|
return transaction(async (client) => {
|
|||
|
|
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
|||
|
|
let insertedCount = 0;
|
|||
|
|
|
|||
|
|
for (const data of dataArray) {
|
|||
|
|
const fields: string[] = [];
|
|||
|
|
const values: any[] = [];
|
|||
|
|
let paramIndex = 1;
|
|||
|
|
|
|||
|
|
fieldMappings.forEach((mapping: any) => {
|
|||
|
|
fields.push(mapping.targetField);
|
|||
|
|
const value =
|
|||
|
|
mapping.staticValue !== undefined
|
|||
|
|
? mapping.staticValue
|
|||
|
|
: data[mapping.sourceField];
|
|||
|
|
values.push(value);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
const sql = `
|
|||
|
|
INSERT INTO ${targetTable} (${fields.join(", ")})
|
|||
|
|
VALUES (${fields.map((_, i) => `$${i + 1}`).join(", ")})
|
|||
|
|
`;
|
|||
|
|
|
|||
|
|
await client.query(sql, values);
|
|||
|
|
insertedCount++;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(`✅ INSERT 완료: ${targetTable}, ${insertedCount}건`);
|
|||
|
|
|
|||
|
|
return { insertedCount };
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* UPDATE 액션 노드 실행
|
|||
|
|
*/
|
|||
|
|
private static async executeUpdateAction(
|
|||
|
|
node: FlowNode,
|
|||
|
|
inputData: any,
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<any> {
|
|||
|
|
const { targetTable, fieldMappings, whereConditions } = node.data;
|
|||
|
|
|
|||
|
|
return transaction(async (client) => {
|
|||
|
|
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
|||
|
|
let updatedCount = 0;
|
|||
|
|
|
|||
|
|
for (const data of dataArray) {
|
|||
|
|
const setClauses: string[] = [];
|
|||
|
|
const values: any[] = [];
|
|||
|
|
let paramIndex = 1;
|
|||
|
|
|
|||
|
|
fieldMappings.forEach((mapping: any) => {
|
|||
|
|
const value =
|
|||
|
|
mapping.staticValue !== undefined
|
|||
|
|
? mapping.staticValue
|
|||
|
|
: data[mapping.sourceField];
|
|||
|
|
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
|
|||
|
|
values.push(value);
|
|||
|
|
paramIndex++;
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
const whereClause = this.buildWhereClause(
|
|||
|
|
whereConditions,
|
|||
|
|
data,
|
|||
|
|
paramIndex
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
const sql = `
|
|||
|
|
UPDATE ${targetTable}
|
|||
|
|
SET ${setClauses.join(", ")}
|
|||
|
|
${whereClause}
|
|||
|
|
`;
|
|||
|
|
|
|||
|
|
const result = await client.query(sql, values);
|
|||
|
|
updatedCount += result.rowCount || 0;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(`✅ UPDATE 완료: ${targetTable}, ${updatedCount}건`);
|
|||
|
|
|
|||
|
|
return { updatedCount };
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* DELETE 액션 노드 실행
|
|||
|
|
*/
|
|||
|
|
private static async executeDeleteAction(
|
|||
|
|
node: FlowNode,
|
|||
|
|
inputData: any,
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<any> {
|
|||
|
|
const { targetTable, whereConditions } = node.data;
|
|||
|
|
|
|||
|
|
return transaction(async (client) => {
|
|||
|
|
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
|||
|
|
let deletedCount = 0;
|
|||
|
|
|
|||
|
|
for (const data of dataArray) {
|
|||
|
|
const whereClause = this.buildWhereClause(whereConditions, data, 1);
|
|||
|
|
|
|||
|
|
const sql = `DELETE FROM ${targetTable} ${whereClause}`;
|
|||
|
|
|
|||
|
|
const result = await client.query(sql, []);
|
|||
|
|
deletedCount += result.rowCount || 0;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(`✅ DELETE 완료: ${targetTable}, ${deletedCount}건`);
|
|||
|
|
|
|||
|
|
return { deletedCount };
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* UPSERT 액션 노드 실행 (로직 기반)
|
|||
|
|
* DB 제약 조건 없이 SELECT → UPDATE or INSERT 방식으로 구현
|
|||
|
|
*/
|
|||
|
|
private static async executeUpsertAction(
|
|||
|
|
node: FlowNode,
|
|||
|
|
inputData: any,
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<any> {
|
|||
|
|
const { targetTable, fieldMappings, conflictKeys } = node.data;
|
|||
|
|
|
|||
|
|
if (!targetTable || !fieldMappings || fieldMappings.length === 0) {
|
|||
|
|
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (!conflictKeys || conflictKeys.length === 0) {
|
|||
|
|
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return transaction(async (client) => {
|
|||
|
|
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
|||
|
|
let insertedCount = 0;
|
|||
|
|
let updatedCount = 0;
|
|||
|
|
|
|||
|
|
for (const data of dataArray) {
|
|||
|
|
// 1. 충돌 키 값 추출
|
|||
|
|
const conflictKeyValues: Record<string, any> = {};
|
|||
|
|
conflictKeys.forEach((key: string) => {
|
|||
|
|
const mapping = fieldMappings.find((m: any) => m.targetField === key);
|
|||
|
|
if (mapping) {
|
|||
|
|
conflictKeyValues[key] =
|
|||
|
|
mapping.staticValue !== undefined
|
|||
|
|
? mapping.staticValue
|
|||
|
|
: data[mapping.sourceField];
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// 2. 존재 여부 확인 (SELECT)
|
|||
|
|
const whereConditions = conflictKeys
|
|||
|
|
.map((key: string, index: number) => `${key} = $${index + 1}`)
|
|||
|
|
.join(" AND ");
|
|||
|
|
const whereValues = conflictKeys.map(
|
|||
|
|
(key: string) => conflictKeyValues[key]
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
const checkSql = `SELECT id FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`;
|
|||
|
|
const existingRow = await client.query(checkSql, whereValues);
|
|||
|
|
|
|||
|
|
if (existingRow.rows.length > 0) {
|
|||
|
|
// 3-A. 존재하면 UPDATE
|
|||
|
|
const setClauses: string[] = [];
|
|||
|
|
const updateValues: any[] = [];
|
|||
|
|
let paramIndex = 1;
|
|||
|
|
|
|||
|
|
fieldMappings.forEach((mapping: any) => {
|
|||
|
|
// 충돌 키가 아닌 필드만 UPDATE
|
|||
|
|
if (!conflictKeys.includes(mapping.targetField)) {
|
|||
|
|
const value =
|
|||
|
|
mapping.staticValue !== undefined
|
|||
|
|
? mapping.staticValue
|
|||
|
|
: data[mapping.sourceField];
|
|||
|
|
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
|
|||
|
|
updateValues.push(value);
|
|||
|
|
paramIndex++;
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// WHERE 조건 생성 (파라미터 인덱스 이어서)
|
|||
|
|
const updateWhereConditions = conflictKeys
|
|||
|
|
.map(
|
|||
|
|
(key: string, index: number) => `${key} = $${paramIndex + index}`
|
|||
|
|
)
|
|||
|
|
.join(" AND ");
|
|||
|
|
|
|||
|
|
// WHERE 조건 값 추가
|
|||
|
|
whereValues.forEach((val: any) => {
|
|||
|
|
updateValues.push(val);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
const updateSql = `
|
|||
|
|
UPDATE ${targetTable}
|
|||
|
|
SET ${setClauses.join(", ")}
|
|||
|
|
WHERE ${updateWhereConditions}
|
|||
|
|
`;
|
|||
|
|
|
|||
|
|
logger.info(`🔄 UPDATE 실행:`, {
|
|||
|
|
table: targetTable,
|
|||
|
|
conflictKeys,
|
|||
|
|
conflictKeyValues,
|
|||
|
|
sql: updateSql,
|
|||
|
|
values: updateValues,
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
await client.query(updateSql, updateValues);
|
|||
|
|
updatedCount++;
|
|||
|
|
} else {
|
|||
|
|
// 3-B. 없으면 INSERT
|
|||
|
|
const columns: string[] = [];
|
|||
|
|
const values: any[] = [];
|
|||
|
|
|
|||
|
|
fieldMappings.forEach((mapping: any) => {
|
|||
|
|
const value =
|
|||
|
|
mapping.staticValue !== undefined
|
|||
|
|
? mapping.staticValue
|
|||
|
|
: data[mapping.sourceField];
|
|||
|
|
columns.push(mapping.targetField);
|
|||
|
|
values.push(value);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
|
|||
|
|
const insertSql = `
|
|||
|
|
INSERT INTO ${targetTable} (${columns.join(", ")})
|
|||
|
|
VALUES (${placeholders})
|
|||
|
|
`;
|
|||
|
|
|
|||
|
|
logger.info(`➕ INSERT 실행:`, {
|
|||
|
|
table: targetTable,
|
|||
|
|
conflictKeys,
|
|||
|
|
conflictKeyValues,
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
await client.query(insertSql, values);
|
|||
|
|
insertedCount++;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(
|
|||
|
|
`✅ UPSERT 완료: ${targetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}건`
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
insertedCount,
|
|||
|
|
updatedCount,
|
|||
|
|
totalCount: insertedCount + updatedCount,
|
|||
|
|
};
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 조건 노드 실행
|
|||
|
|
*/
|
|||
|
|
private static async executeCondition(
|
|||
|
|
node: FlowNode,
|
|||
|
|
inputData: any,
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<boolean> {
|
|||
|
|
const { conditions, logic } = node.data;
|
|||
|
|
|
|||
|
|
const results = conditions.map((condition: any) => {
|
|||
|
|
const fieldValue = inputData[condition.field];
|
|||
|
|
return this.evaluateCondition(
|
|||
|
|
fieldValue,
|
|||
|
|
condition.operator,
|
|||
|
|
condition.value
|
|||
|
|
);
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
const result =
|
|||
|
|
logic === "OR"
|
|||
|
|
? results.some((r: boolean) => r)
|
|||
|
|
: results.every((r: boolean) => r);
|
|||
|
|
|
|||
|
|
logger.info(`🔍 조건 평가 결과: ${result}`);
|
|||
|
|
|
|||
|
|
return result;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* WHERE 절 생성
|
|||
|
|
*/
|
|||
|
|
private static buildWhereClause(
|
|||
|
|
conditions: any[],
|
|||
|
|
data?: any,
|
|||
|
|
startIndex: number = 1
|
|||
|
|
): string {
|
|||
|
|
if (!conditions || conditions.length === 0) {
|
|||
|
|
return "";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const clauses = conditions.map((condition, index) => {
|
|||
|
|
const value = data ? data[condition.field] : condition.value;
|
|||
|
|
return `${condition.field} ${condition.operator} $${startIndex + index}`;
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
return `WHERE ${clauses.join(" AND ")}`;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 조건 평가
|
|||
|
|
*/
|
|||
|
|
private static evaluateCondition(
|
|||
|
|
fieldValue: any,
|
|||
|
|
operator: string,
|
|||
|
|
expectedValue: any
|
|||
|
|
): boolean {
|
|||
|
|
switch (operator) {
|
|||
|
|
case "equals":
|
|||
|
|
case "=":
|
|||
|
|
return fieldValue === expectedValue;
|
|||
|
|
case "notEquals":
|
|||
|
|
case "!=":
|
|||
|
|
return fieldValue !== expectedValue;
|
|||
|
|
case "greaterThan":
|
|||
|
|
case ">":
|
|||
|
|
return fieldValue > expectedValue;
|
|||
|
|
case "lessThan":
|
|||
|
|
case "<":
|
|||
|
|
return fieldValue < expectedValue;
|
|||
|
|
case "contains":
|
|||
|
|
return String(fieldValue).includes(String(expectedValue));
|
|||
|
|
default:
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 실행 결과 생성
|
|||
|
|
*/
|
|||
|
|
private static generateExecutionResult(
|
|||
|
|
nodes: FlowNode[],
|
|||
|
|
context: ExecutionContext,
|
|||
|
|
executionTime: number
|
|||
|
|
): ExecutionResult {
|
|||
|
|
const nodeSummaries: NodeExecutionSummary[] = nodes.map((node) => {
|
|||
|
|
const result = context.nodeResults.get(node.id);
|
|||
|
|
return {
|
|||
|
|
nodeId: node.id,
|
|||
|
|
nodeName: node.data.displayName || node.id,
|
|||
|
|
nodeType: node.type,
|
|||
|
|
status: result?.status || "pending",
|
|||
|
|
duration: result?.endTime
|
|||
|
|
? result.endTime - result.startTime
|
|||
|
|
: undefined,
|
|||
|
|
error: result?.error?.message,
|
|||
|
|
};
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
const summary = {
|
|||
|
|
total: nodes.length,
|
|||
|
|
success: nodeSummaries.filter((n) => n.status === "success").length,
|
|||
|
|
failed: nodeSummaries.filter((n) => n.status === "failed").length,
|
|||
|
|
skipped: nodeSummaries.filter((n) => n.status === "skipped").length,
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
const success = summary.failed === 0;
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
success,
|
|||
|
|
message: success
|
|||
|
|
? `플로우 실행 성공 (${summary.success}/${summary.total})`
|
|||
|
|
: `플로우 실행 부분 실패 (성공: ${summary.success}, 실패: ${summary.failed})`,
|
|||
|
|
executionTime,
|
|||
|
|
nodes: nodeSummaries,
|
|||
|
|
summary,
|
|||
|
|
};
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 데이터 변환 노드 실행
|
|||
|
|
*/
|
|||
|
|
private static async executeDataTransform(
|
|||
|
|
node: FlowNode,
|
|||
|
|
inputData: any,
|
|||
|
|
context: ExecutionContext
|
|||
|
|
): Promise<any[]> {
|
|||
|
|
const { transformations } = node.data;
|
|||
|
|
|
|||
|
|
if (
|
|||
|
|
!transformations ||
|
|||
|
|
!Array.isArray(transformations) ||
|
|||
|
|
transformations.length === 0
|
|||
|
|
) {
|
|||
|
|
logger.warn(`⚠️ 데이터 변환 노드에 변환 규칙이 없습니다: ${node.id}`);
|
|||
|
|
return Array.isArray(inputData) ? inputData : [inputData];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// inputData를 배열로 정규화
|
|||
|
|
const rows = Array.isArray(inputData) ? inputData : [inputData];
|
|||
|
|
logger.info(
|
|||
|
|
`🔄 데이터 변환 시작: ${rows.length}개 행, ${transformations.length}개 변환`
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
// 각 변환 규칙을 순차적으로 적용
|
|||
|
|
let transformedRows = rows;
|
|||
|
|
|
|||
|
|
for (const transform of transformations) {
|
|||
|
|
const transformType = transform.type;
|
|||
|
|
logger.info(` 🔹 변환 적용: ${transformType}`);
|
|||
|
|
|
|||
|
|
transformedRows = this.applyTransformation(transformedRows, transform);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(`✅ 데이터 변환 완료: ${transformedRows.length}개 행`);
|
|||
|
|
return transformedRows;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 단일 변환 규칙 적용
|
|||
|
|
*/
|
|||
|
|
private static applyTransformation(rows: any[], transform: any): any[] {
|
|||
|
|
const {
|
|||
|
|
type,
|
|||
|
|
sourceField,
|
|||
|
|
targetField,
|
|||
|
|
delimiter,
|
|||
|
|
separator,
|
|||
|
|
searchValue,
|
|||
|
|
replaceValue,
|
|||
|
|
splitIndex,
|
|||
|
|
castType,
|
|||
|
|
expression,
|
|||
|
|
} = transform;
|
|||
|
|
|
|||
|
|
// 타겟 필드 결정 (비어있으면 소스 필드 사용 = in-place)
|
|||
|
|
const actualTargetField = targetField || sourceField;
|
|||
|
|
|
|||
|
|
switch (type) {
|
|||
|
|
case "UPPERCASE":
|
|||
|
|
return rows.map((row) => ({
|
|||
|
|
...row,
|
|||
|
|
[actualTargetField]:
|
|||
|
|
row[sourceField]?.toString().toUpperCase() || row[sourceField],
|
|||
|
|
}));
|
|||
|
|
|
|||
|
|
case "LOWERCASE":
|
|||
|
|
return rows.map((row) => ({
|
|||
|
|
...row,
|
|||
|
|
[actualTargetField]:
|
|||
|
|
row[sourceField]?.toString().toLowerCase() || row[sourceField],
|
|||
|
|
}));
|
|||
|
|
|
|||
|
|
case "TRIM":
|
|||
|
|
return rows.map((row) => ({
|
|||
|
|
...row,
|
|||
|
|
[actualTargetField]:
|
|||
|
|
row[sourceField]?.toString().trim() || row[sourceField],
|
|||
|
|
}));
|
|||
|
|
|
|||
|
|
case "EXPLODE":
|
|||
|
|
return this.applyExplode(
|
|||
|
|
rows,
|
|||
|
|
sourceField,
|
|||
|
|
actualTargetField,
|
|||
|
|
delimiter || ","
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
case "CONCAT":
|
|||
|
|
return rows.map((row) => {
|
|||
|
|
const value1 = row[sourceField] || "";
|
|||
|
|
// CONCAT은 여러 필드를 합칠 수 있지만, 단순화하여 expression 사용
|
|||
|
|
const value2 = expression || "";
|
|||
|
|
return {
|
|||
|
|
...row,
|
|||
|
|
[actualTargetField]: `${value1}${separator || ""}${value2}`,
|
|||
|
|
};
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
case "SPLIT":
|
|||
|
|
return rows.map((row) => {
|
|||
|
|
const value = row[sourceField]?.toString() || "";
|
|||
|
|
const parts = value.split(delimiter || ",");
|
|||
|
|
const index = splitIndex !== undefined ? splitIndex : 0;
|
|||
|
|
return {
|
|||
|
|
...row,
|
|||
|
|
[actualTargetField]: parts[index] || "",
|
|||
|
|
};
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
case "REPLACE":
|
|||
|
|
return rows.map((row) => {
|
|||
|
|
const value = row[sourceField]?.toString() || "";
|
|||
|
|
const replaced = value.replace(
|
|||
|
|
new RegExp(searchValue || "", "g"),
|
|||
|
|
replaceValue || ""
|
|||
|
|
);
|
|||
|
|
return {
|
|||
|
|
...row,
|
|||
|
|
[actualTargetField]: replaced,
|
|||
|
|
};
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
case "CAST":
|
|||
|
|
return rows.map((row) => {
|
|||
|
|
const value = row[sourceField];
|
|||
|
|
let castedValue = value;
|
|||
|
|
|
|||
|
|
switch (castType) {
|
|||
|
|
case "string":
|
|||
|
|
castedValue = value?.toString() || "";
|
|||
|
|
break;
|
|||
|
|
case "number":
|
|||
|
|
castedValue = parseFloat(value) || 0;
|
|||
|
|
break;
|
|||
|
|
case "boolean":
|
|||
|
|
castedValue = Boolean(value);
|
|||
|
|
break;
|
|||
|
|
case "date":
|
|||
|
|
castedValue = new Date(value);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
...row,
|
|||
|
|
[actualTargetField]: castedValue,
|
|||
|
|
};
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
case "FORMAT":
|
|||
|
|
case "CALCULATE":
|
|||
|
|
case "JSON_EXTRACT":
|
|||
|
|
case "CUSTOM":
|
|||
|
|
// 표현식 기반 변환 (현재는 단순 구현)
|
|||
|
|
logger.warn(`⚠️ ${type} 변환은 아직 완전히 구현되지 않았습니다.`);
|
|||
|
|
return rows.map((row) => ({
|
|||
|
|
...row,
|
|||
|
|
[actualTargetField]: row[sourceField] || "",
|
|||
|
|
}));
|
|||
|
|
|
|||
|
|
default:
|
|||
|
|
logger.warn(`⚠️ 지원하지 않는 변환 타입: ${type}`);
|
|||
|
|
return rows;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* EXPLODE 변환: 1개 행을 N개 행으로 확장
|
|||
|
|
*/
|
|||
|
|
private static applyExplode(
|
|||
|
|
rows: any[],
|
|||
|
|
sourceField: string,
|
|||
|
|
targetField: string,
|
|||
|
|
delimiter: string
|
|||
|
|
): any[] {
|
|||
|
|
const expandedRows: any[] = [];
|
|||
|
|
|
|||
|
|
for (const row of rows) {
|
|||
|
|
const value = row[sourceField];
|
|||
|
|
|
|||
|
|
if (!value) {
|
|||
|
|
// 값이 없으면 원본 행 유지
|
|||
|
|
expandedRows.push(row);
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 문자열을 구분자로 분리
|
|||
|
|
const values = value
|
|||
|
|
.toString()
|
|||
|
|
.split(delimiter)
|
|||
|
|
.map((v: string) => v.trim());
|
|||
|
|
|
|||
|
|
// 각 값마다 새 행 생성
|
|||
|
|
for (const val of values) {
|
|||
|
|
expandedRows.push({
|
|||
|
|
...row, // 다른 필드들은 복제
|
|||
|
|
[targetField]: val, // 타겟 필드에 분리된 값 저장
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(
|
|||
|
|
` 💥 EXPLODE: ${rows.length}개 행 → ${expandedRows.length}개 행`
|
|||
|
|
);
|
|||
|
|
return expandedRows;
|
|||
|
|
}
|
|||
|
|
}
|