3177 lines
95 KiB
TypeScript
3177 lines
95 KiB
TypeScript
/**
|
||
* 노드 플로우 실행 엔진
|
||
*
|
||
* 기능:
|
||
* - 위상 정렬 (Topological Sort)
|
||
* - 레벨별 병렬 실행 (Promise.allSettled)
|
||
* - 독립 트랜잭션 처리
|
||
* - 연쇄 중단 (부모 실패 시 자식 스킵)
|
||
*/
|
||
|
||
import { query, queryOne, transaction } from "../database/db";
|
||
import { logger } from "../utils/logger";
|
||
import axios from "axios";
|
||
|
||
// ===== 타입 정의 =====
|
||
|
||
export interface FlowNode {
|
||
id: string;
|
||
type: NodeType;
|
||
position?: { x: number; y: number };
|
||
data: NodeData;
|
||
}
|
||
|
||
export type NodeType =
|
||
| "tableSource"
|
||
| "externalDBSource"
|
||
| "restAPISource"
|
||
| "condition"
|
||
| "dataTransform"
|
||
| "insertAction"
|
||
| "updateAction"
|
||
| "deleteAction"
|
||
| "upsertAction"
|
||
| "comment"
|
||
| "log";
|
||
|
||
export interface NodeData {
|
||
displayName?: string;
|
||
[key: string]: any;
|
||
}
|
||
|
||
export interface FlowEdge {
|
||
id: string;
|
||
source: string;
|
||
target: string;
|
||
sourceHandle?: string;
|
||
targetHandle?: string;
|
||
}
|
||
|
||
export interface ExecutionContext {
|
||
sourceData?: any[]; // 외부에서 주입된 데이터 (선택된 행 또는 폼 데이터)
|
||
dataSourceType?: string; // "table-selection" | "form" | "none"
|
||
nodeResults: Map<string, NodeResult>;
|
||
executionOrder: string[];
|
||
buttonContext?: ButtonContext;
|
||
// 🆕 현재 실행 중인 소스 노드의 dataSourceType (context-data | table-all)
|
||
currentNodeDataSourceType?: string;
|
||
}
|
||
|
||
export interface ButtonContext {
|
||
buttonId: string;
|
||
screenId?: number;
|
||
companyCode?: string;
|
||
userId?: string;
|
||
formData?: Record<string, any>;
|
||
selectedRowsData?: Record<string, any>[];
|
||
}
|
||
|
||
export interface NodeResult {
|
||
nodeId: string;
|
||
status: "pending" | "success" | "failed" | "skipped";
|
||
data?: any;
|
||
error?: Error;
|
||
startTime: number;
|
||
endTime?: number;
|
||
}
|
||
|
||
export interface ExecutionResult {
|
||
success: boolean;
|
||
message: string;
|
||
executionTime: number;
|
||
nodes: NodeExecutionSummary[];
|
||
summary: {
|
||
total: number;
|
||
success: number;
|
||
failed: number;
|
||
skipped: number;
|
||
};
|
||
}
|
||
|
||
export interface NodeExecutionSummary {
|
||
nodeId: string;
|
||
nodeName: string;
|
||
nodeType: NodeType;
|
||
status: "success" | "failed" | "skipped" | "pending";
|
||
duration?: number;
|
||
error?: string;
|
||
}
|
||
|
||
// ===== 메인 실행 서비스 =====
|
||
|
||
export class NodeFlowExecutionService {
|
||
/**
|
||
* 플로우 실행 메인 함수
|
||
*/
|
||
static async executeFlow(
|
||
flowId: number,
|
||
contextData: Record<string, any>
|
||
): Promise<ExecutionResult> {
|
||
const startTime = Date.now();
|
||
|
||
try {
|
||
logger.info(`🚀 플로우 실행 시작: flowId=${flowId}`);
|
||
|
||
// 1. 플로우 데이터 조회
|
||
const flow = await queryOne<{
|
||
flow_id: number;
|
||
flow_name: string;
|
||
flow_data: any;
|
||
}>(
|
||
`SELECT flow_id, flow_name, flow_data FROM node_flows WHERE flow_id = $1`,
|
||
[flowId]
|
||
);
|
||
|
||
if (!flow) {
|
||
throw new Error(`플로우를 찾을 수 없습니다: flowId=${flowId}`);
|
||
}
|
||
|
||
const flowData =
|
||
typeof flow.flow_data === "string"
|
||
? JSON.parse(flow.flow_data)
|
||
: flow.flow_data;
|
||
|
||
const { nodes, edges } = flowData;
|
||
|
||
logger.info(`📊 플로우 정보:`, {
|
||
flowName: flow.flow_name,
|
||
nodeCount: nodes.length,
|
||
edgeCount: edges.length,
|
||
});
|
||
|
||
// 2. 실행 컨텍스트 준비
|
||
const context: ExecutionContext = {
|
||
sourceData: contextData.sourceData || [],
|
||
dataSourceType: contextData.dataSourceType || "none",
|
||
nodeResults: new Map(),
|
||
executionOrder: [],
|
||
buttonContext: {
|
||
buttonId:
|
||
contextData.buttonId || contextData.context?.buttonId || "unknown",
|
||
screenId: contextData.screenId || contextData.context?.screenId,
|
||
companyCode:
|
||
contextData.companyCode || contextData.context?.companyCode,
|
||
userId: contextData.userId || contextData.context?.userId,
|
||
formData: contextData.formData || contextData.context?.formData,
|
||
selectedRowsData:
|
||
contextData.selectedRowsData ||
|
||
contextData.context?.selectedRowsData,
|
||
},
|
||
};
|
||
|
||
logger.info(`📦 실행 컨텍스트:`, {
|
||
dataSourceType: context.dataSourceType,
|
||
sourceDataCount: context.sourceData?.length || 0,
|
||
buttonContext: context.buttonContext,
|
||
});
|
||
|
||
// 3. 위상 정렬
|
||
const levels = this.topologicalSort(nodes, edges);
|
||
logger.info(`📋 실행 순서 (레벨별):`, levels);
|
||
|
||
// 4. 🔥 전체 플로우를 하나의 트랜잭션으로 실행
|
||
let result: ExecutionResult;
|
||
|
||
try {
|
||
result = await transaction(async (client) => {
|
||
// 트랜잭션 내에서 레벨별 실행
|
||
for (const level of levels) {
|
||
await this.executeLevel(level, nodes, edges, context, client);
|
||
}
|
||
|
||
// 5. 결과 생성
|
||
const executionTime = Date.now() - startTime;
|
||
const executionResult = this.generateExecutionResult(
|
||
nodes,
|
||
context,
|
||
executionTime
|
||
);
|
||
|
||
// 실패한 액션 노드가 있으면 롤백
|
||
const failedActionNodes = Array.from(
|
||
context.nodeResults.values()
|
||
).filter(
|
||
(result) =>
|
||
result.status === "failed" &&
|
||
nodes.find(
|
||
(n: FlowNode) =>
|
||
n.id === result.nodeId && this.isActionNode(n.type)
|
||
)
|
||
);
|
||
|
||
if (failedActionNodes.length > 0) {
|
||
logger.warn(
|
||
`🔄 액션 노드 실패 감지 (${failedActionNodes.length}개), 트랜잭션 롤백`
|
||
);
|
||
throw new Error(
|
||
`액션 노드 실패: ${failedActionNodes.map((n) => n.nodeId).join(", ")}`
|
||
);
|
||
}
|
||
|
||
return executionResult;
|
||
});
|
||
|
||
logger.info(`✅ 플로우 실행 완료:`, result.summary);
|
||
return result;
|
||
} catch (error) {
|
||
logger.error(`❌ 플로우 실행 실패, 모든 변경사항 롤백됨:`, error);
|
||
throw error;
|
||
}
|
||
} catch (error) {
|
||
logger.error(`❌ 플로우 실행 실패:`, error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 소스 데이터 준비
|
||
*/
|
||
private static prepareSourceData(contextData: Record<string, any>): any[] {
|
||
const { controlDataSource, formData, selectedRowsData } = contextData;
|
||
|
||
switch (controlDataSource) {
|
||
case "form":
|
||
return formData ? [formData] : [];
|
||
|
||
case "table-selection":
|
||
return selectedRowsData || [];
|
||
|
||
case "both":
|
||
return [
|
||
{ source: "form", data: formData },
|
||
{ source: "table", data: selectedRowsData },
|
||
];
|
||
|
||
default:
|
||
return formData ? [formData] : [];
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 위상 정렬 (Topological Sort)
|
||
* DAG(Directed Acyclic Graph)를 레벨별로 그룹화
|
||
*/
|
||
private static topologicalSort(
|
||
nodes: FlowNode[],
|
||
edges: FlowEdge[]
|
||
): string[][] {
|
||
const levels: string[][] = [];
|
||
const inDegree = new Map<string, number>();
|
||
const adjacency = new Map<string, string[]>();
|
||
|
||
// 초기화
|
||
nodes.forEach((node) => {
|
||
inDegree.set(node.id, 0);
|
||
adjacency.set(node.id, []);
|
||
});
|
||
|
||
// 진입 차수 계산
|
||
edges.forEach((edge) => {
|
||
inDegree.set(edge.target, (inDegree.get(edge.target) || 0) + 1);
|
||
adjacency.get(edge.source)?.push(edge.target);
|
||
});
|
||
|
||
// 레벨별 분류
|
||
let currentLevel = nodes
|
||
.filter((node) => inDegree.get(node.id) === 0)
|
||
.map((node) => node.id);
|
||
|
||
while (currentLevel.length > 0) {
|
||
levels.push([...currentLevel]);
|
||
|
||
const nextLevel: string[] = [];
|
||
currentLevel.forEach((nodeId) => {
|
||
const neighbors = adjacency.get(nodeId) || [];
|
||
neighbors.forEach((neighbor) => {
|
||
const newDegree = (inDegree.get(neighbor) || 1) - 1;
|
||
inDegree.set(neighbor, newDegree);
|
||
if (newDegree === 0) {
|
||
nextLevel.push(neighbor);
|
||
}
|
||
});
|
||
});
|
||
|
||
currentLevel = nextLevel;
|
||
}
|
||
|
||
return levels;
|
||
}
|
||
|
||
/**
|
||
* 레벨 내 노드 병렬 실행
|
||
*/
|
||
private static async executeLevel(
|
||
nodeIds: string[],
|
||
nodes: FlowNode[],
|
||
edges: FlowEdge[],
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<void> {
|
||
logger.info(`⏳ 레벨 실행 시작: ${nodeIds.length}개 노드`);
|
||
|
||
// Promise.allSettled로 병렬 실행
|
||
const results = await Promise.allSettled(
|
||
nodeIds.map((nodeId) =>
|
||
this.executeNode(nodeId, nodes, edges, context, client)
|
||
)
|
||
);
|
||
|
||
// 결과 저장
|
||
results.forEach((result, index) => {
|
||
const nodeId = nodeIds[index];
|
||
if (result.status === "fulfilled") {
|
||
context.nodeResults.set(nodeId, result.value);
|
||
context.executionOrder.push(nodeId);
|
||
} else {
|
||
context.nodeResults.set(nodeId, {
|
||
nodeId,
|
||
status: "failed",
|
||
error: result.reason,
|
||
startTime: Date.now(),
|
||
endTime: Date.now(),
|
||
});
|
||
}
|
||
});
|
||
|
||
logger.info(`✅ 레벨 실행 완료`);
|
||
}
|
||
|
||
/**
|
||
* 개별 노드 실행
|
||
*/
|
||
private static async executeNode(
|
||
nodeId: string,
|
||
nodes: FlowNode[],
|
||
edges: FlowEdge[],
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<NodeResult> {
|
||
const startTime = Date.now();
|
||
const node = nodes.find((n) => n.id === nodeId);
|
||
|
||
if (!node) {
|
||
throw new Error(`노드를 찾을 수 없습니다: ${nodeId}`);
|
||
}
|
||
|
||
logger.info(`🔄 노드 실행 시작: ${nodeId} (${node.type})`);
|
||
|
||
// 1. 부모 노드 상태 확인 (연쇄 중단)
|
||
const parents = this.getParentNodes(nodeId, edges);
|
||
const parentFailed = parents.some((parentId) => {
|
||
const parentResult = context.nodeResults.get(parentId);
|
||
return parentResult?.status === "failed";
|
||
});
|
||
|
||
if (parentFailed) {
|
||
logger.warn(`⏭️ 노드 스킵 (부모 실패): ${nodeId}`);
|
||
return {
|
||
nodeId,
|
||
status: "skipped",
|
||
error: new Error("Parent node failed"),
|
||
startTime,
|
||
endTime: Date.now(),
|
||
};
|
||
}
|
||
|
||
// 2. 입력 데이터 준비
|
||
const inputData = this.prepareInputData(nodeId, parents, edges, context);
|
||
|
||
// 3. 노드 타입별 실행
|
||
try {
|
||
const result = await this.executeNodeByType(
|
||
node,
|
||
inputData,
|
||
context,
|
||
client
|
||
);
|
||
|
||
logger.info(`✅ 노드 실행 성공: ${nodeId}`);
|
||
|
||
return {
|
||
nodeId,
|
||
status: "success",
|
||
data: result,
|
||
startTime,
|
||
endTime: Date.now(),
|
||
};
|
||
} catch (error) {
|
||
logger.error(`❌ 노드 실행 실패: ${nodeId}`, error);
|
||
|
||
return {
|
||
nodeId,
|
||
status: "failed",
|
||
error: error as Error,
|
||
startTime,
|
||
endTime: Date.now(),
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 부모 노드 목록 조회
|
||
*/
|
||
private static getParentNodes(nodeId: string, edges: FlowEdge[]): string[] {
|
||
return edges
|
||
.filter((edge) => edge.target === nodeId)
|
||
.map((edge) => edge.source);
|
||
}
|
||
|
||
/**
|
||
* 입력 데이터 준비
|
||
*/
|
||
private static prepareInputData(
|
||
nodeId: string,
|
||
parents: string[],
|
||
edges: FlowEdge[],
|
||
context: ExecutionContext
|
||
): any {
|
||
if (parents.length === 0) {
|
||
// 소스 노드: 원본 데이터 사용
|
||
return context.sourceData;
|
||
} else if (parents.length === 1) {
|
||
// 단일 부모: 부모의 결과 데이터 전달
|
||
const parentId = parents[0];
|
||
const parentResult = context.nodeResults.get(parentId);
|
||
let data = parentResult?.data || context.sourceData;
|
||
|
||
// 🔥 조건 노드에서 온 데이터인 경우 sourceHandle 확인
|
||
const edge = edges.find(
|
||
(e) => e.source === parentId && e.target === nodeId
|
||
);
|
||
if (
|
||
edge?.sourceHandle &&
|
||
data &&
|
||
typeof data === "object" &&
|
||
"conditionResult" in data
|
||
) {
|
||
// 조건 노드의 결과 객체
|
||
if (edge.sourceHandle === "true") {
|
||
logger.info(
|
||
`✅ TRUE 브랜치 데이터 사용: ${data.trueData?.length || 0}건`
|
||
);
|
||
return data.trueData || [];
|
||
} else if (edge.sourceHandle === "false") {
|
||
logger.info(
|
||
`✅ FALSE 브랜치 데이터 사용: ${data.falseData?.length || 0}건`
|
||
);
|
||
return data.falseData || [];
|
||
} else {
|
||
// sourceHandle이 없거나 다른 값이면 allData 사용
|
||
return data.allData || data;
|
||
}
|
||
}
|
||
|
||
return data;
|
||
} else {
|
||
// 다중 부모: 모든 부모의 데이터 병합
|
||
const allData: any[] = [];
|
||
|
||
parents.forEach((parentId) => {
|
||
const parentResult = context.nodeResults.get(parentId);
|
||
let data = parentResult?.data || context.sourceData;
|
||
|
||
// 🔥 조건 노드에서 온 데이터인 경우 sourceHandle 확인
|
||
const edge = edges.find(
|
||
(e) => e.source === parentId && e.target === nodeId
|
||
);
|
||
if (
|
||
edge?.sourceHandle &&
|
||
data &&
|
||
typeof data === "object" &&
|
||
"conditionResult" in data
|
||
) {
|
||
// 조건 노드의 결과 객체
|
||
if (edge.sourceHandle === "true") {
|
||
data = data.trueData || [];
|
||
} else if (edge.sourceHandle === "false") {
|
||
data = data.falseData || [];
|
||
} else {
|
||
data = data.allData || data;
|
||
}
|
||
}
|
||
|
||
// 배열이면 펼쳐서 추가
|
||
if (Array.isArray(data)) {
|
||
allData.push(...data);
|
||
} else {
|
||
allData.push(data);
|
||
}
|
||
});
|
||
|
||
logger.info(
|
||
`🔗 다중 부모 병합: ${parents.length}개 부모, 총 ${allData.length}건 데이터`
|
||
);
|
||
|
||
return allData;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 노드 타입별 실행 로직
|
||
*/
|
||
private static async executeNodeByType(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
switch (node.type) {
|
||
case "tableSource":
|
||
return this.executeTableSource(node, context);
|
||
|
||
case "externalDBSource":
|
||
return this.executeExternalDBSource(node, context);
|
||
|
||
case "restAPISource":
|
||
return this.executeRestAPISource(node, context);
|
||
|
||
case "dataTransform":
|
||
return this.executeDataTransform(node, inputData, context);
|
||
|
||
case "insertAction":
|
||
return this.executeInsertAction(node, inputData, context, client);
|
||
|
||
case "updateAction":
|
||
return this.executeUpdateAction(node, inputData, context, client);
|
||
|
||
case "deleteAction":
|
||
return this.executeDeleteAction(node, inputData, context, client);
|
||
|
||
case "upsertAction":
|
||
return this.executeUpsertAction(node, inputData, context, client);
|
||
|
||
case "condition":
|
||
return this.executeCondition(node, inputData, context);
|
||
|
||
case "comment":
|
||
case "log":
|
||
// 로그/코멘트는 실행 없이 통과
|
||
logger.info(`📝 ${node.type}: ${node.data.displayName || node.id}`);
|
||
return { message: "Logged" };
|
||
|
||
default:
|
||
logger.warn(`⚠️ 지원하지 않는 노드 타입: ${node.type}`);
|
||
return { message: "Unsupported node type" };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* REST API 소스 노드 실행
|
||
*/
|
||
private static async executeRestAPISource(
|
||
node: FlowNode,
|
||
context: ExecutionContext
|
||
): Promise<any[]> {
|
||
const {
|
||
url,
|
||
method = "GET",
|
||
headers = {},
|
||
body,
|
||
timeout = 30000,
|
||
responseMapping,
|
||
authentication,
|
||
} = node.data;
|
||
|
||
if (!url) {
|
||
throw new Error("REST API URL이 설정되지 않았습니다.");
|
||
}
|
||
|
||
logger.info(`🌐 REST API 호출: ${method} ${url}`);
|
||
|
||
try {
|
||
// 헤더 설정
|
||
const requestHeaders: any = { ...headers };
|
||
|
||
// 인증 헤더 추가
|
||
if (authentication) {
|
||
if (authentication.type === "bearer" && authentication.token) {
|
||
requestHeaders["Authorization"] = `Bearer ${authentication.token}`;
|
||
} else if (
|
||
authentication.type === "basic" &&
|
||
authentication.username &&
|
||
authentication.password
|
||
) {
|
||
const credentials = Buffer.from(
|
||
`${authentication.username}:${authentication.password}`
|
||
).toString("base64");
|
||
requestHeaders["Authorization"] = `Basic ${credentials}`;
|
||
} else if (authentication.type === "apikey" && authentication.token) {
|
||
const headerName = authentication.apiKeyHeader || "X-API-Key";
|
||
requestHeaders[headerName] = authentication.token;
|
||
}
|
||
}
|
||
|
||
if (!requestHeaders["Content-Type"]) {
|
||
requestHeaders["Content-Type"] = "application/json";
|
||
}
|
||
|
||
// API 호출
|
||
const response = await axios({
|
||
method: method.toLowerCase(),
|
||
url,
|
||
headers: requestHeaders,
|
||
data: body,
|
||
timeout,
|
||
});
|
||
|
||
logger.info(`✅ REST API 응답 수신: ${response.status}`);
|
||
|
||
let responseData = response.data;
|
||
|
||
// 🔥 표준 API 응답 형식 자동 감지 { success, message, data }
|
||
if (
|
||
!responseMapping &&
|
||
responseData &&
|
||
typeof responseData === "object" &&
|
||
"success" in responseData &&
|
||
"data" in responseData
|
||
) {
|
||
logger.info("🔍 표준 API 응답 형식 감지, data 속성 자동 추출");
|
||
responseData = responseData.data;
|
||
}
|
||
|
||
// responseMapping이 있으면 해당 경로의 데이터 추출
|
||
if (responseMapping && responseData) {
|
||
logger.info(`🔍 응답 매핑 적용: ${responseMapping}`);
|
||
const path = responseMapping.split(".");
|
||
for (const key of path) {
|
||
if (
|
||
responseData &&
|
||
typeof responseData === "object" &&
|
||
key in responseData
|
||
) {
|
||
responseData = responseData[key];
|
||
} else {
|
||
logger.warn(
|
||
`⚠️ 응답 매핑 경로를 찾을 수 없습니다: ${responseMapping}`
|
||
);
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
// 배열이 아니면 배열로 변환
|
||
if (!Array.isArray(responseData)) {
|
||
logger.info("🔄 단일 객체를 배열로 변환");
|
||
responseData = [responseData];
|
||
}
|
||
|
||
logger.info(`📦 REST API 데이터 ${responseData.length}건 반환`);
|
||
|
||
// 첫 번째 데이터 샘플 상세 로깅
|
||
if (responseData.length > 0) {
|
||
console.log("🔍 REST API 응답 데이터 샘플 (첫 번째 항목):");
|
||
console.log(JSON.stringify(responseData[0], null, 2));
|
||
console.log("🔑 사용 가능한 필드명:", Object.keys(responseData[0]));
|
||
}
|
||
|
||
return responseData;
|
||
} catch (error: any) {
|
||
logger.error(`❌ REST API 호출 실패:`, error.message);
|
||
throw new Error(`REST API 호출 실패: ${error.message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 외부 DB 소스 노드 실행
|
||
*/
|
||
private static async executeExternalDBSource(
|
||
node: FlowNode,
|
||
context: ExecutionContext
|
||
): Promise<any[]> {
|
||
const { connectionId, tableName, schema, whereConditions, dataSourceType } =
|
||
node.data;
|
||
|
||
// 🆕 노드의 dataSourceType 확인 (기본값: context-data)
|
||
const nodeDataSourceType = dataSourceType || "context-data";
|
||
|
||
// 🆕 ExecutionContext에 현재 소스 노드의 dataSourceType 저장
|
||
context.currentNodeDataSourceType = nodeDataSourceType;
|
||
|
||
logger.info(
|
||
`🔌 외부 DB 소스 노드 실행: ${connectionId}.${tableName}, dataSourceType=${nodeDataSourceType}`
|
||
);
|
||
|
||
// 1. context-data 모드: 외부에서 주입된 데이터 사용
|
||
if (nodeDataSourceType === "context-data") {
|
||
if (
|
||
context.sourceData &&
|
||
Array.isArray(context.sourceData) &&
|
||
context.sourceData.length > 0
|
||
) {
|
||
logger.info(
|
||
`📊 컨텍스트 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}건`
|
||
);
|
||
return context.sourceData;
|
||
}
|
||
|
||
logger.warn(
|
||
`⚠️ context-data 모드이지만 전달된 데이터가 없습니다. 빈 배열 반환.`
|
||
);
|
||
return [];
|
||
}
|
||
|
||
// 2. table-all 모드: 외부 DB 테이블 전체 데이터 조회
|
||
if (nodeDataSourceType === "table-all") {
|
||
if (!connectionId || !tableName) {
|
||
throw new Error(
|
||
"외부 DB 연결 정보 또는 테이블명이 설정되지 않았습니다."
|
||
);
|
||
}
|
||
|
||
try {
|
||
// 연결 풀 서비스 임포트 (동적 임포트로 순환 참조 방지)
|
||
const { ExternalDbConnectionPoolService } = await import(
|
||
"./externalDbConnectionPoolService"
|
||
);
|
||
const poolService = ExternalDbConnectionPoolService.getInstance();
|
||
|
||
// 스키마 접두사 처리
|
||
const schemaPrefix = schema ? `${schema}.` : "";
|
||
const fullTableName = `${schemaPrefix}${tableName}`;
|
||
|
||
// WHERE 절 생성
|
||
let sql = `SELECT * FROM ${fullTableName}`;
|
||
let params: any[] = [];
|
||
|
||
if (whereConditions && whereConditions.length > 0) {
|
||
const whereResult = this.buildWhereClause(whereConditions);
|
||
sql += ` ${whereResult.clause}`;
|
||
params = whereResult.values;
|
||
}
|
||
|
||
logger.info(`📊 외부 DB 쿼리 실행: ${sql}`);
|
||
|
||
// 연결 풀을 통해 쿼리 실행
|
||
const result = await poolService.executeQuery(
|
||
connectionId,
|
||
sql,
|
||
params
|
||
);
|
||
|
||
logger.info(
|
||
`✅ 외부 DB 전체 데이터 조회 완료: ${tableName}, ${result.length}건`
|
||
);
|
||
|
||
return result;
|
||
} catch (error: any) {
|
||
logger.error(`❌ 외부 DB 소스 조회 실패:`, error);
|
||
throw new Error(
|
||
`외부 DB 조회 실패 (연결 ID: ${connectionId}): ${error.message}`
|
||
);
|
||
}
|
||
}
|
||
|
||
// 3. 알 수 없는 모드 (기본값으로 처리)
|
||
logger.warn(
|
||
`⚠️ 알 수 없는 dataSourceType: ${nodeDataSourceType}, context-data로 처리`
|
||
);
|
||
|
||
if (
|
||
context.sourceData &&
|
||
Array.isArray(context.sourceData) &&
|
||
context.sourceData.length > 0
|
||
) {
|
||
return context.sourceData;
|
||
}
|
||
|
||
return [];
|
||
}
|
||
|
||
/**
|
||
* 테이블 소스 노드 실행
|
||
*/
|
||
private static async executeTableSource(
|
||
node: FlowNode,
|
||
context: ExecutionContext
|
||
): Promise<any[]> {
|
||
const { tableName, schema, whereConditions, dataSourceType } = node.data;
|
||
|
||
// 🆕 노드의 dataSourceType 확인 (기본값: context-data)
|
||
const nodeDataSourceType = dataSourceType || "context-data";
|
||
|
||
// 🆕 ExecutionContext에 현재 소스 노드의 dataSourceType 저장
|
||
context.currentNodeDataSourceType = nodeDataSourceType;
|
||
|
||
logger.info(
|
||
`📊 테이블 소스 노드 실행: ${tableName}, dataSourceType=${nodeDataSourceType}`
|
||
);
|
||
|
||
// 1. context-data 모드: 외부에서 주입된 데이터 사용
|
||
if (nodeDataSourceType === "context-data") {
|
||
if (
|
||
context.sourceData &&
|
||
Array.isArray(context.sourceData) &&
|
||
context.sourceData.length > 0
|
||
) {
|
||
logger.info(
|
||
`📊 컨텍스트 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}건`
|
||
);
|
||
return context.sourceData;
|
||
}
|
||
|
||
logger.warn(
|
||
`⚠️ context-data 모드이지만 전달된 데이터가 없습니다. 빈 배열 반환.`
|
||
);
|
||
return [];
|
||
}
|
||
|
||
// 2. table-all 모드: 테이블 전체 데이터 조회
|
||
if (nodeDataSourceType === "table-all") {
|
||
if (!tableName) {
|
||
logger.warn("⚠️ 테이블 소스 노드에 테이블명이 없습니다.");
|
||
return [];
|
||
}
|
||
|
||
const schemaPrefix = schema ? `${schema}.` : "";
|
||
const whereResult = whereConditions
|
||
? this.buildWhereClause(whereConditions)
|
||
: { clause: "", values: [] };
|
||
|
||
const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereResult.clause}`;
|
||
|
||
const result = await query(sql, whereResult.values);
|
||
|
||
logger.info(
|
||
`📊 테이블 전체 데이터 조회: ${tableName}, ${result.length}건`
|
||
);
|
||
|
||
return result;
|
||
}
|
||
|
||
// 3. 알 수 없는 모드 (기본값으로 처리)
|
||
logger.warn(
|
||
`⚠️ 알 수 없는 dataSourceType: ${nodeDataSourceType}, context-data로 처리`
|
||
);
|
||
|
||
if (
|
||
context.sourceData &&
|
||
Array.isArray(context.sourceData) &&
|
||
context.sourceData.length > 0
|
||
) {
|
||
return context.sourceData;
|
||
}
|
||
|
||
return [];
|
||
}
|
||
|
||
/**
|
||
* INSERT 액션 노드 실행
|
||
*/
|
||
private static async executeInsertAction(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
const { targetType } = node.data;
|
||
|
||
// 🔥 타겟 타입별 분기
|
||
switch (targetType) {
|
||
case "internal":
|
||
return this.executeInternalInsert(node, inputData, context, client);
|
||
|
||
case "external":
|
||
return this.executeExternalInsert(node, inputData, context);
|
||
|
||
case "api":
|
||
return this.executeApiInsert(node, inputData, context);
|
||
|
||
default:
|
||
// 하위 호환성: targetType이 없으면 internal로 간주
|
||
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
|
||
return this.executeInternalInsert(node, inputData, context, client);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 내부 DB INSERT 실행
|
||
*/
|
||
private static async executeInternalInsert(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
const { targetTable, fieldMappings } = node.data;
|
||
|
||
logger.info(`💾 INSERT 노드 실행: ${targetTable}`);
|
||
console.log(
|
||
"📥 입력 데이터 타입:",
|
||
typeof inputData,
|
||
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
|
||
);
|
||
|
||
if (inputData && inputData.length > 0) {
|
||
console.log("📄 첫 번째 입력 데이터:");
|
||
console.log(JSON.stringify(inputData[0], null, 2));
|
||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||
}
|
||
|
||
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
|
||
const executeInsert = async (txClient: any) => {
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
let insertedCount = 0;
|
||
const insertedDataArray: any[] = [];
|
||
|
||
for (const data of dataArray) {
|
||
const fields: string[] = [];
|
||
const values: any[] = [];
|
||
|
||
// 🔥 삽입된 데이터 복사본 생성
|
||
const insertedData = { ...data };
|
||
|
||
console.log("🗺️ 필드 매핑 처리 중...");
|
||
fieldMappings.forEach((mapping: any) => {
|
||
fields.push(mapping.targetField);
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
|
||
console.log(
|
||
` ${mapping.sourceField} → ${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
|
||
);
|
||
values.push(value);
|
||
|
||
// 🔥 삽입된 값을 데이터에 반영
|
||
insertedData[mapping.targetField] = value;
|
||
});
|
||
|
||
const sql = `
|
||
INSERT INTO ${targetTable} (${fields.join(", ")})
|
||
VALUES (${fields.map((_, i) => `$${i + 1}`).join(", ")})
|
||
RETURNING *
|
||
`;
|
||
|
||
console.log("📝 실행할 SQL:", sql);
|
||
console.log("📊 바인딩 값:", values);
|
||
|
||
const result = await txClient.query(sql, values);
|
||
insertedCount++;
|
||
|
||
// 🔥 RETURNING으로 받은 실제 삽입 데이터 사용 (AUTO_INCREMENT 등 포함)
|
||
if (result.rows && result.rows.length > 0) {
|
||
insertedDataArray.push(result.rows[0]);
|
||
} else {
|
||
// RETURNING이 없으면 생성한 데이터 사용
|
||
insertedDataArray.push(insertedData);
|
||
}
|
||
}
|
||
|
||
logger.info(
|
||
`✅ INSERT 완료 (내부 DB): ${targetTable}, ${insertedCount}건`
|
||
);
|
||
|
||
// 🔥 삽입된 데이터 반환 (AUTO_INCREMENT ID 등 포함)
|
||
return insertedDataArray;
|
||
};
|
||
|
||
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
|
||
if (client) {
|
||
return executeInsert(client);
|
||
} else {
|
||
return transaction(executeInsert);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 외부 DB INSERT 실행
|
||
*/
|
||
private static async executeExternalInsert(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const {
|
||
externalConnectionId,
|
||
externalDbType,
|
||
externalTargetTable,
|
||
fieldMappings,
|
||
} = node.data;
|
||
|
||
if (!externalConnectionId || !externalTargetTable) {
|
||
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
|
||
}
|
||
|
||
logger.info(
|
||
`🔌 외부 DB INSERT 시작: ${externalDbType} - ${externalTargetTable}`
|
||
);
|
||
|
||
// 외부 DB 커넥터 생성
|
||
const connector = await this.createExternalConnector(
|
||
externalConnectionId,
|
||
externalDbType
|
||
);
|
||
|
||
try {
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
let insertedCount = 0;
|
||
const insertedDataArray: any[] = [];
|
||
|
||
// 🔥 Oracle의 경우 autoCommit을 false로 설정하여 트랜잭션 제어
|
||
const isOracle = externalDbType.toLowerCase() === "oracle";
|
||
|
||
for (const data of dataArray) {
|
||
const fields: string[] = [];
|
||
const values: any[] = [];
|
||
const insertedData: any = { ...data };
|
||
|
||
fieldMappings.forEach((mapping: any) => {
|
||
fields.push(mapping.targetField);
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
values.push(value);
|
||
// 🔥 삽입된 데이터 객체에 매핑된 값 적용
|
||
insertedData[mapping.targetField] = value;
|
||
});
|
||
|
||
// 외부 DB별 SQL 문법 차이 처리
|
||
let sql: string;
|
||
let params: any[];
|
||
|
||
if (isOracle) {
|
||
// Oracle: :1, :2, ... 형식
|
||
const placeholders = fields.map((_, i) => `:${i + 1}`).join(", ");
|
||
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
|
||
params = values;
|
||
} else if (
|
||
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
|
||
) {
|
||
// MySQL/MariaDB: ? 형식
|
||
const placeholders = fields.map(() => "?").join(", ");
|
||
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
|
||
params = values;
|
||
} else if (externalDbType.toLowerCase() === "mssql") {
|
||
// MSSQL: @p1, @p2, ... 형식
|
||
const placeholders = fields.map((_, i) => `@p${i + 1}`).join(", ");
|
||
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
|
||
params = values;
|
||
} else {
|
||
// PostgreSQL: $1, $2, ... 형식 (기본)
|
||
const placeholders = fields.map((_, i) => `$${i + 1}`).join(", ");
|
||
sql = `INSERT INTO ${externalTargetTable} (${fields.join(", ")}) VALUES (${placeholders})`;
|
||
params = values;
|
||
}
|
||
|
||
await connector.executeQuery(sql, params);
|
||
insertedCount++;
|
||
insertedDataArray.push(insertedData);
|
||
}
|
||
|
||
// 🔥 Oracle의 경우 명시적 COMMIT
|
||
await this.commitExternalTransaction(
|
||
connector,
|
||
externalDbType,
|
||
insertedCount
|
||
);
|
||
|
||
logger.info(
|
||
`✅ INSERT 완료 (외부 DB): ${externalTargetTable}, ${insertedCount}건`
|
||
);
|
||
|
||
// 🔥 삽입된 데이터 반환 (외부 DB는 자동 생성 ID 없으므로 입력 데이터 기반)
|
||
return insertedDataArray;
|
||
} catch (error) {
|
||
// 🔥 Oracle의 경우 오류 시 ROLLBACK
|
||
await this.rollbackExternalTransaction(connector, externalDbType);
|
||
throw error;
|
||
} finally {
|
||
// 연결 해제
|
||
await connector.disconnect();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* REST API INSERT 실행 (POST 요청)
|
||
*/
|
||
private static async executeApiInsert(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const {
|
||
apiEndpoint,
|
||
apiMethod,
|
||
apiAuthType,
|
||
apiAuthConfig,
|
||
apiHeaders,
|
||
apiBodyTemplate,
|
||
fieldMappings,
|
||
} = node.data;
|
||
|
||
if (!apiEndpoint) {
|
||
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
|
||
}
|
||
|
||
logger.info(`🌐 REST API INSERT 시작: ${apiMethod} ${apiEndpoint}`);
|
||
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
const results: any[] = [];
|
||
|
||
for (const data of dataArray) {
|
||
// 헤더 설정
|
||
const headers: any = { ...apiHeaders };
|
||
|
||
// 인증 헤더 추가
|
||
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
|
||
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
|
||
} else if (
|
||
apiAuthType === "basic" &&
|
||
apiAuthConfig?.username &&
|
||
apiAuthConfig?.password
|
||
) {
|
||
const credentials = Buffer.from(
|
||
`${apiAuthConfig.username}:${apiAuthConfig.password}`
|
||
).toString("base64");
|
||
headers["Authorization"] = `Basic ${credentials}`;
|
||
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
|
||
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
|
||
headers[headerName] = apiAuthConfig.apiKey;
|
||
}
|
||
|
||
// Content-Type 기본값 설정
|
||
if (!headers["Content-Type"]) {
|
||
headers["Content-Type"] = "application/json";
|
||
}
|
||
|
||
// 바디 생성 (템플릿 또는 필드 매핑)
|
||
let body: any;
|
||
|
||
if (apiBodyTemplate) {
|
||
// 템플릿 변수 치환
|
||
body = this.replaceTemplateVariables(apiBodyTemplate, data);
|
||
} else if (fieldMappings && fieldMappings.length > 0) {
|
||
// 필드 매핑 사용
|
||
body = {};
|
||
fieldMappings.forEach((mapping: any) => {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
body[mapping.targetField] = value;
|
||
});
|
||
} else {
|
||
// 전체 데이터 전송
|
||
body = data;
|
||
}
|
||
|
||
try {
|
||
const response = await axios({
|
||
method: apiMethod || "POST",
|
||
url: apiEndpoint,
|
||
headers,
|
||
data: body,
|
||
timeout: 30000, // 30초 타임아웃
|
||
});
|
||
|
||
results.push({
|
||
status: response.status,
|
||
data: response.data,
|
||
});
|
||
} catch (error: any) {
|
||
logger.error(
|
||
`❌ API 요청 실패: ${error.response?.status || error.message}`
|
||
);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
logger.info(`✅ REST API INSERT 완료: ${results.length}건`);
|
||
|
||
return { results };
|
||
}
|
||
|
||
/**
|
||
* 템플릿 변수 치환 ({{variable}} 형식)
|
||
*/
|
||
private static replaceTemplateVariables(template: string, data: any): string {
|
||
let result = template;
|
||
|
||
// {{variable}} 형식의 변수를 찾아서 치환
|
||
const matches = template.match(/\{\{([^}]+)\}\}/g);
|
||
if (matches) {
|
||
matches.forEach((match) => {
|
||
const key = match.replace(/\{\{|\}\}/g, "").trim();
|
||
const value = this.getNestedValue(data, key);
|
||
result = result.replace(match, value !== undefined ? value : "");
|
||
});
|
||
}
|
||
|
||
return result;
|
||
}
|
||
|
||
/**
|
||
* 중첩된 객체 값 가져오기 (예: "user.name")
|
||
*/
|
||
private static getNestedValue(obj: any, path: string): any {
|
||
return path.split(".").reduce((current, key) => current?.[key], obj);
|
||
}
|
||
|
||
/**
|
||
* 외부 DB 커넥터 생성 (공통 로직)
|
||
*/
|
||
private static async createExternalConnector(
|
||
connectionId: number,
|
||
dbType: string
|
||
): Promise<any> {
|
||
// 🔥 연결 풀 서비스를 통한 연결 관리 (연결 풀 고갈 방지)
|
||
const { ExternalDbConnectionPoolService } = await import(
|
||
"./externalDbConnectionPoolService"
|
||
);
|
||
const poolService = ExternalDbConnectionPoolService.getInstance();
|
||
const pool = await poolService.getPool(connectionId);
|
||
|
||
// DatabaseConnectorFactory와 호환되도록 래퍼 객체 반환
|
||
return {
|
||
executeQuery: async (sql: string, params?: any[]) => {
|
||
const result = await pool.query(sql, params);
|
||
return {
|
||
rows: Array.isArray(result) ? result : [result],
|
||
rowCount: Array.isArray(result) ? result.length : 1,
|
||
affectedRows: Array.isArray(result) ? result.length : 1,
|
||
};
|
||
},
|
||
disconnect: async () => {
|
||
// 연결 풀은 자동 관리되므로 즉시 종료하지 않음
|
||
logger.debug(`📌 연결 풀 유지 (ID: ${connectionId})`);
|
||
},
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 외부 DB 트랜잭션 커밋 (Oracle 전용)
|
||
*/
|
||
private static async commitExternalTransaction(
|
||
connector: any,
|
||
dbType: string,
|
||
count: number
|
||
): Promise<void> {
|
||
if (dbType.toLowerCase() === "oracle" && count > 0) {
|
||
await connector.executeQuery("COMMIT");
|
||
logger.info(`✅ Oracle COMMIT 실행: ${count}건`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 외부 DB 트랜잭션 롤백 (Oracle 전용)
|
||
*/
|
||
private static async rollbackExternalTransaction(
|
||
connector: any,
|
||
dbType: string
|
||
): Promise<void> {
|
||
if (dbType.toLowerCase() === "oracle") {
|
||
try {
|
||
await connector.executeQuery("ROLLBACK");
|
||
logger.info(`⚠️ Oracle ROLLBACK 실행 (오류 발생)`);
|
||
} catch (rollbackError) {
|
||
logger.error(`❌ Oracle ROLLBACK 실패:`, rollbackError);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* UPDATE 액션 노드 실행
|
||
*/
|
||
private static async executeUpdateAction(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
const { targetType } = node.data;
|
||
|
||
// 🔥 타겟 타입별 분기
|
||
switch (targetType) {
|
||
case "internal":
|
||
return this.executeInternalUpdate(node, inputData, context, client);
|
||
|
||
case "external":
|
||
return this.executeExternalUpdate(node, inputData, context);
|
||
|
||
case "api":
|
||
return this.executeApiUpdate(node, inputData, context);
|
||
|
||
default:
|
||
// 하위 호환성: targetType이 없으면 internal로 간주
|
||
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
|
||
return this.executeInternalUpdate(node, inputData, context, client);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 내부 DB UPDATE 실행
|
||
*/
|
||
private static async executeInternalUpdate(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
const { targetTable, fieldMappings, whereConditions } = node.data;
|
||
|
||
logger.info(`🔄 UPDATE 노드 실행: ${targetTable}`);
|
||
console.log(
|
||
"📥 입력 데이터 타입:",
|
||
typeof inputData,
|
||
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
|
||
);
|
||
|
||
if (inputData && inputData.length > 0) {
|
||
console.log("📄 첫 번째 입력 데이터:");
|
||
console.log(JSON.stringify(inputData[0], null, 2));
|
||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||
}
|
||
|
||
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
|
||
const executeUpdate = async (txClient: any) => {
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
let updatedCount = 0;
|
||
const updatedDataArray: any[] = [];
|
||
|
||
// 🆕 table-all 모드: 단일 SQL로 일괄 업데이트
|
||
if (context.currentNodeDataSourceType === "table-all") {
|
||
console.log("🚀 table-all 모드: 단일 SQL로 일괄 업데이트 시작");
|
||
|
||
// 첫 번째 데이터를 참조하여 SET 절 생성
|
||
const firstData = dataArray[0];
|
||
const setClauses: string[] = [];
|
||
const values: any[] = [];
|
||
let paramIndex = 1;
|
||
|
||
console.log("🗺️ 필드 매핑 처리 중...");
|
||
fieldMappings.forEach((mapping: any) => {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: firstData[mapping.sourceField];
|
||
|
||
console.log(
|
||
` ${mapping.sourceField} → ${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
|
||
);
|
||
|
||
if (mapping.targetField) {
|
||
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
|
||
values.push(value);
|
||
paramIndex++;
|
||
}
|
||
});
|
||
|
||
// WHERE 조건 (사용자 정의 조건만 사용, PK 자동 추가 안 함)
|
||
const whereResult = this.buildWhereClause(
|
||
whereConditions,
|
||
firstData,
|
||
paramIndex
|
||
);
|
||
|
||
values.push(...whereResult.values);
|
||
|
||
const sql = `
|
||
UPDATE ${targetTable}
|
||
SET ${setClauses.join(", ")}
|
||
${whereResult.clause}
|
||
`;
|
||
|
||
console.log("📝 실행할 SQL (일괄 처리):", sql);
|
||
console.log("📊 바인딩 값:", values);
|
||
|
||
const result = await txClient.query(sql, values);
|
||
updatedCount = result.rowCount || 0;
|
||
|
||
logger.info(
|
||
`✅ UPDATE 완료 (내부 DB, 일괄 처리): ${targetTable}, ${updatedCount}건`
|
||
);
|
||
|
||
// 업데이트된 데이터는 원본 배열 반환 (실제 DB에서 다시 조회하지 않음)
|
||
return dataArray;
|
||
}
|
||
|
||
// 🆕 context-data 모드: 개별 업데이트 (PK 자동 추가)
|
||
console.log("🎯 context-data 모드: 개별 업데이트 시작");
|
||
|
||
for (const data of dataArray) {
|
||
const setClauses: string[] = [];
|
||
const values: any[] = [];
|
||
let paramIndex = 1;
|
||
|
||
// 🔥 업데이트된 데이터 복사본 생성
|
||
const updatedData = { ...data };
|
||
|
||
console.log("🗺️ 필드 매핑 처리 중...");
|
||
fieldMappings.forEach((mapping: any) => {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
|
||
console.log(
|
||
` ${mapping.sourceField} → ${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
|
||
);
|
||
|
||
// targetField가 비어있지 않은 경우만 추가
|
||
if (mapping.targetField) {
|
||
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
|
||
values.push(value);
|
||
paramIndex++;
|
||
|
||
// 🔥 업데이트된 값을 데이터에 반영
|
||
updatedData[mapping.targetField] = value;
|
||
} else {
|
||
console.log(
|
||
`⚠️ targetField가 비어있어 스킵: ${mapping.sourceField}`
|
||
);
|
||
}
|
||
});
|
||
|
||
// 🔑 Primary Key 자동 추가 (context-data 모드)
|
||
console.log("🔑 context-data 모드: Primary Key 자동 추가");
|
||
const enhancedWhereConditions = await this.enhanceWhereConditionsWithPK(
|
||
whereConditions,
|
||
data,
|
||
targetTable
|
||
);
|
||
|
||
const whereResult = this.buildWhereClause(
|
||
enhancedWhereConditions,
|
||
data,
|
||
paramIndex
|
||
);
|
||
|
||
// WHERE 절의 값들을 values 배열에 추가
|
||
values.push(...whereResult.values);
|
||
|
||
const sql = `
|
||
UPDATE ${targetTable}
|
||
SET ${setClauses.join(", ")}
|
||
${whereResult.clause}
|
||
`;
|
||
|
||
console.log("📝 실행할 SQL:", sql);
|
||
console.log("📊 바인딩 값:", values);
|
||
|
||
const result = await txClient.query(sql, values);
|
||
updatedCount += result.rowCount || 0;
|
||
|
||
// 🔥 업데이트된 데이터 저장
|
||
updatedDataArray.push(updatedData);
|
||
}
|
||
|
||
logger.info(
|
||
`✅ UPDATE 완료 (내부 DB): ${targetTable}, ${updatedCount}건`
|
||
);
|
||
|
||
// 🔥 업데이트된 데이터 반환 (다음 노드에서 사용)
|
||
return updatedDataArray;
|
||
};
|
||
|
||
// 🔥 클라이언트가 전달되었으면 사용, 없으면 독립 트랜잭션 생성
|
||
if (client) {
|
||
return executeUpdate(client);
|
||
} else {
|
||
return transaction(executeUpdate);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 외부 DB UPDATE 실행
|
||
*/
|
||
private static async executeExternalUpdate(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const {
|
||
externalConnectionId,
|
||
externalDbType,
|
||
externalTargetTable,
|
||
fieldMappings,
|
||
whereConditions,
|
||
} = node.data;
|
||
|
||
if (!externalConnectionId || !externalTargetTable) {
|
||
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
|
||
}
|
||
|
||
logger.info(
|
||
`🔌 외부 DB UPDATE 시작: ${externalDbType} - ${externalTargetTable}`
|
||
);
|
||
|
||
// 외부 DB 커넥터 생성
|
||
const connector = await this.createExternalConnector(
|
||
externalConnectionId,
|
||
externalDbType
|
||
);
|
||
|
||
try {
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
let updatedCount = 0;
|
||
const updatedDataArray: any[] = [];
|
||
|
||
for (const data of dataArray) {
|
||
const setClauses: string[] = [];
|
||
const values: any[] = [];
|
||
let paramIndex = 1;
|
||
const updatedData: any = { ...data };
|
||
|
||
fieldMappings.forEach((mapping: any) => {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
|
||
// DB별 플레이스홀더
|
||
if (externalDbType.toLowerCase() === "oracle") {
|
||
setClauses.push(`${mapping.targetField} = :${paramIndex}`);
|
||
} else if (
|
||
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
|
||
) {
|
||
setClauses.push(`${mapping.targetField} = ?`);
|
||
} else if (externalDbType.toLowerCase() === "mssql") {
|
||
setClauses.push(`${mapping.targetField} = @p${paramIndex}`);
|
||
} else {
|
||
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
|
||
}
|
||
|
||
values.push(value);
|
||
paramIndex++;
|
||
// 🔥 업데이트된 데이터 객체에 매핑된 값 적용
|
||
updatedData[mapping.targetField] = value;
|
||
});
|
||
|
||
// WHERE 조건 생성
|
||
const whereClauses: string[] = [];
|
||
whereConditions?.forEach((condition: any) => {
|
||
const condValue = data[condition.field];
|
||
|
||
if (condition.operator === "IS NULL") {
|
||
whereClauses.push(`${condition.field} IS NULL`);
|
||
} else if (condition.operator === "IS NOT NULL") {
|
||
whereClauses.push(`${condition.field} IS NOT NULL`);
|
||
} else {
|
||
if (externalDbType.toLowerCase() === "oracle") {
|
||
whereClauses.push(
|
||
`${condition.field} ${condition.operator} :${paramIndex}`
|
||
);
|
||
} else if (
|
||
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
|
||
) {
|
||
whereClauses.push(`${condition.field} ${condition.operator} ?`);
|
||
} else if (externalDbType.toLowerCase() === "mssql") {
|
||
whereClauses.push(
|
||
`${condition.field} ${condition.operator} @p${paramIndex}`
|
||
);
|
||
} else {
|
||
whereClauses.push(
|
||
`${condition.field} ${condition.operator} $${paramIndex}`
|
||
);
|
||
}
|
||
values.push(condValue);
|
||
paramIndex++;
|
||
}
|
||
});
|
||
|
||
const whereClause =
|
||
whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : "";
|
||
|
||
const sql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} ${whereClause}`;
|
||
|
||
const result = await connector.executeQuery(sql, values);
|
||
updatedCount += result.rowCount || result.affectedRows || 0;
|
||
updatedDataArray.push(updatedData);
|
||
}
|
||
|
||
// 🔥 Oracle의 경우 명시적 COMMIT
|
||
await this.commitExternalTransaction(
|
||
connector,
|
||
externalDbType,
|
||
updatedCount
|
||
);
|
||
|
||
logger.info(
|
||
`✅ UPDATE 완료 (외부 DB): ${externalTargetTable}, ${updatedCount}건`
|
||
);
|
||
|
||
// 🔥 업데이트된 데이터 반환
|
||
return updatedDataArray;
|
||
} catch (error) {
|
||
// 🔥 Oracle의 경우 오류 시 ROLLBACK
|
||
await this.rollbackExternalTransaction(connector, externalDbType);
|
||
throw error;
|
||
} finally {
|
||
await connector.disconnect();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* REST API UPDATE 실행 (PUT/PATCH 요청)
|
||
*/
|
||
private static async executeApiUpdate(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const {
|
||
apiEndpoint,
|
||
apiMethod,
|
||
apiAuthType,
|
||
apiAuthConfig,
|
||
apiHeaders,
|
||
apiBodyTemplate,
|
||
fieldMappings,
|
||
} = node.data;
|
||
|
||
if (!apiEndpoint) {
|
||
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
|
||
}
|
||
|
||
logger.info(`🌐 REST API UPDATE 시작: ${apiMethod} ${apiEndpoint}`);
|
||
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
const results: any[] = [];
|
||
|
||
for (const data of dataArray) {
|
||
// 헤더 설정
|
||
const headers: any = { ...apiHeaders };
|
||
|
||
// 인증 헤더 추가
|
||
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
|
||
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
|
||
} else if (
|
||
apiAuthType === "basic" &&
|
||
apiAuthConfig?.username &&
|
||
apiAuthConfig?.password
|
||
) {
|
||
const credentials = Buffer.from(
|
||
`${apiAuthConfig.username}:${apiAuthConfig.password}`
|
||
).toString("base64");
|
||
headers["Authorization"] = `Basic ${credentials}`;
|
||
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
|
||
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
|
||
headers[headerName] = apiAuthConfig.apiKey;
|
||
}
|
||
|
||
if (!headers["Content-Type"]) {
|
||
headers["Content-Type"] = "application/json";
|
||
}
|
||
|
||
// 바디 생성
|
||
let body: any;
|
||
|
||
if (apiBodyTemplate) {
|
||
body = this.replaceTemplateVariables(apiBodyTemplate, data);
|
||
} else if (fieldMappings && fieldMappings.length > 0) {
|
||
body = {};
|
||
fieldMappings.forEach((mapping: any) => {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
body[mapping.targetField] = value;
|
||
});
|
||
} else {
|
||
body = data;
|
||
}
|
||
|
||
try {
|
||
const response = await axios({
|
||
method: apiMethod || "PUT",
|
||
url: apiEndpoint,
|
||
headers,
|
||
data: body,
|
||
timeout: 30000,
|
||
});
|
||
|
||
results.push({
|
||
status: response.status,
|
||
data: response.data,
|
||
});
|
||
} catch (error: any) {
|
||
logger.error(
|
||
`❌ API 요청 실패: ${error.response?.status || error.message}`
|
||
);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
logger.info(`✅ REST API UPDATE 완료: ${results.length}건`);
|
||
|
||
return { results };
|
||
}
|
||
|
||
/**
|
||
* DELETE 액션 노드 실행
|
||
*/
|
||
private static async executeDeleteAction(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
const { targetType } = node.data;
|
||
|
||
// 🔥 타겟 타입별 분기
|
||
switch (targetType) {
|
||
case "internal":
|
||
return this.executeInternalDelete(node, inputData, context, client);
|
||
|
||
case "external":
|
||
return this.executeExternalDelete(node, inputData, context);
|
||
|
||
case "api":
|
||
return this.executeApiDelete(node, inputData, context);
|
||
|
||
default:
|
||
// 하위 호환성: targetType이 없으면 internal로 간주
|
||
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
|
||
return this.executeInternalDelete(node, inputData, context, client);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 내부 DB DELETE 실행
|
||
*/
|
||
private static async executeInternalDelete(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
const { targetTable, whereConditions } = node.data;
|
||
|
||
logger.info(`🗑️ DELETE 노드 실행: ${targetTable}`);
|
||
console.log(
|
||
"📥 입력 데이터 타입:",
|
||
typeof inputData,
|
||
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
|
||
);
|
||
|
||
if (inputData && inputData.length > 0) {
|
||
console.log("📄 첫 번째 입력 데이터:");
|
||
console.log(JSON.stringify(inputData[0], null, 2));
|
||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||
}
|
||
|
||
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
|
||
const executeDelete = async (txClient: any) => {
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
let deletedCount = 0;
|
||
const deletedDataArray: any[] = [];
|
||
|
||
// 🆕 table-all 모드: 단일 SQL로 일괄 삭제
|
||
if (context.currentNodeDataSourceType === "table-all") {
|
||
console.log("🚀 table-all 모드: 단일 SQL로 일괄 삭제 시작");
|
||
|
||
// 첫 번째 데이터를 참조하여 WHERE 절 생성
|
||
const firstData = dataArray[0];
|
||
|
||
// WHERE 조건 (사용자 정의 조건만 사용, PK 자동 추가 안 함)
|
||
const whereResult = this.buildWhereClause(whereConditions, firstData, 1);
|
||
|
||
const sql = `DELETE FROM ${targetTable} ${whereResult.clause} RETURNING *`;
|
||
|
||
console.log("📝 실행할 SQL (일괄 처리):", sql);
|
||
console.log("📊 바인딩 값:", whereResult.values);
|
||
|
||
const result = await txClient.query(sql, whereResult.values);
|
||
deletedCount = result.rowCount || 0;
|
||
|
||
// 🔥 RETURNING으로 받은 삭제된 데이터 저장
|
||
if (result.rows && result.rows.length > 0) {
|
||
deletedDataArray.push(...result.rows);
|
||
}
|
||
|
||
logger.info(
|
||
`✅ DELETE 완료 (내부 DB, 일괄 처리): ${targetTable}, ${deletedCount}건`
|
||
);
|
||
|
||
return deletedDataArray;
|
||
}
|
||
|
||
// 🆕 context-data 모드: 개별 삭제 (PK 자동 추가)
|
||
console.log("🎯 context-data 모드: 개별 삭제 시작");
|
||
|
||
for (const data of dataArray) {
|
||
console.log("🔍 WHERE 조건 처리 중...");
|
||
|
||
// 🔑 Primary Key 자동 추가 (context-data 모드)
|
||
console.log("🔑 context-data 모드: Primary Key 자동 추가");
|
||
const enhancedWhereConditions = await this.enhanceWhereConditionsWithPK(
|
||
whereConditions,
|
||
data,
|
||
targetTable
|
||
);
|
||
|
||
const whereResult = this.buildWhereClause(enhancedWhereConditions, data, 1);
|
||
|
||
const sql = `DELETE FROM ${targetTable} ${whereResult.clause} RETURNING *`;
|
||
|
||
console.log("📝 실행할 SQL:", sql);
|
||
console.log("📊 바인딩 값:", whereResult.values);
|
||
|
||
const result = await txClient.query(sql, whereResult.values);
|
||
deletedCount += result.rowCount || 0;
|
||
|
||
// 🔥 RETURNING으로 받은 삭제된 데이터 저장
|
||
if (result.rows && result.rows.length > 0) {
|
||
deletedDataArray.push(...result.rows);
|
||
}
|
||
}
|
||
|
||
logger.info(
|
||
`✅ DELETE 완료 (내부 DB): ${targetTable}, ${deletedCount}건`
|
||
);
|
||
|
||
// 🔥 삭제된 데이터 반환 (로그 기록 등에 사용)
|
||
return deletedDataArray;
|
||
};
|
||
|
||
// 🔥 클라이언트가 전달되었으면 사용, 없으면 독립 트랜잭션 생성
|
||
if (client) {
|
||
return executeDelete(client);
|
||
} else {
|
||
return transaction(executeDelete);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 외부 DB DELETE 실행
|
||
*/
|
||
private static async executeExternalDelete(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const {
|
||
externalConnectionId,
|
||
externalDbType,
|
||
externalTargetTable,
|
||
whereConditions,
|
||
} = node.data;
|
||
|
||
if (!externalConnectionId || !externalTargetTable) {
|
||
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
|
||
}
|
||
|
||
logger.info(
|
||
`🔌 외부 DB DELETE 시작: ${externalDbType} - ${externalTargetTable}`
|
||
);
|
||
|
||
// 외부 DB 커넥터 생성
|
||
const connector = await this.createExternalConnector(
|
||
externalConnectionId,
|
||
externalDbType
|
||
);
|
||
|
||
try {
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
let deletedCount = 0;
|
||
const deletedDataArray: any[] = [];
|
||
|
||
for (const data of dataArray) {
|
||
const whereClauses: string[] = [];
|
||
const values: any[] = [];
|
||
let paramIndex = 1;
|
||
|
||
// WHERE 조건 생성
|
||
whereConditions?.forEach((condition: any) => {
|
||
const condValue = data[condition.field];
|
||
|
||
if (condition.operator === "IS NULL") {
|
||
whereClauses.push(`${condition.field} IS NULL`);
|
||
} else if (condition.operator === "IS NOT NULL") {
|
||
whereClauses.push(`${condition.field} IS NOT NULL`);
|
||
} else {
|
||
if (externalDbType.toLowerCase() === "oracle") {
|
||
whereClauses.push(
|
||
`${condition.field} ${condition.operator} :${paramIndex}`
|
||
);
|
||
} else if (
|
||
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
|
||
) {
|
||
whereClauses.push(`${condition.field} ${condition.operator} ?`);
|
||
} else if (externalDbType.toLowerCase() === "mssql") {
|
||
whereClauses.push(
|
||
`${condition.field} ${condition.operator} @p${paramIndex}`
|
||
);
|
||
} else {
|
||
whereClauses.push(
|
||
`${condition.field} ${condition.operator} $${paramIndex}`
|
||
);
|
||
}
|
||
values.push(condValue);
|
||
paramIndex++;
|
||
}
|
||
});
|
||
|
||
const whereClause =
|
||
whereClauses.length > 0 ? `WHERE ${whereClauses.join(" AND ")}` : "";
|
||
|
||
if (!whereClause) {
|
||
throw new Error(
|
||
"DELETE 작업에 WHERE 조건이 필요합니다. (전체 삭제 방지)"
|
||
);
|
||
}
|
||
|
||
// 🔥 삭제 전에 데이터 조회 (로그 기록 용도)
|
||
const selectSql = `SELECT * FROM ${externalTargetTable} ${whereClause}`;
|
||
const selectResult = await connector.executeQuery(selectSql, values);
|
||
if (selectResult && selectResult.length > 0) {
|
||
deletedDataArray.push(...selectResult);
|
||
}
|
||
|
||
// 실제 삭제 수행
|
||
const deleteSql = `DELETE FROM ${externalTargetTable} ${whereClause}`;
|
||
const result = await connector.executeQuery(deleteSql, values);
|
||
deletedCount += result.rowCount || result.affectedRows || 0;
|
||
}
|
||
|
||
// 🔥 Oracle의 경우 명시적 COMMIT
|
||
await this.commitExternalTransaction(
|
||
connector,
|
||
externalDbType,
|
||
deletedCount
|
||
);
|
||
|
||
logger.info(
|
||
`✅ DELETE 완료 (외부 DB): ${externalTargetTable}, ${deletedCount}건`
|
||
);
|
||
|
||
// 🔥 삭제된 데이터 반환
|
||
return deletedDataArray;
|
||
} catch (error) {
|
||
// 🔥 Oracle의 경우 오류 시 ROLLBACK
|
||
await this.rollbackExternalTransaction(connector, externalDbType);
|
||
throw error;
|
||
} finally {
|
||
await connector.disconnect();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* REST API DELETE 실행 (DELETE 요청)
|
||
*/
|
||
private static async executeApiDelete(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const { apiEndpoint, apiAuthType, apiAuthConfig, apiHeaders } = node.data;
|
||
|
||
if (!apiEndpoint) {
|
||
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
|
||
}
|
||
|
||
logger.info(`🌐 REST API DELETE 시작: ${apiEndpoint}`);
|
||
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
const results: any[] = [];
|
||
|
||
for (const data of dataArray) {
|
||
// 헤더 설정
|
||
const headers: any = { ...apiHeaders };
|
||
|
||
// 인증 헤더 추가
|
||
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
|
||
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
|
||
} else if (
|
||
apiAuthType === "basic" &&
|
||
apiAuthConfig?.username &&
|
||
apiAuthConfig?.password
|
||
) {
|
||
const credentials = Buffer.from(
|
||
`${apiAuthConfig.username}:${apiAuthConfig.password}`
|
||
).toString("base64");
|
||
headers["Authorization"] = `Basic ${credentials}`;
|
||
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
|
||
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
|
||
headers[headerName] = apiAuthConfig.apiKey;
|
||
}
|
||
|
||
// DELETE는 일반적으로 URL 파라미터 또는 경로에 ID 포함
|
||
// 템플릿 변수 치환 지원 (예: /api/users/{{id}})
|
||
const url = this.replaceTemplateVariables(apiEndpoint, data);
|
||
|
||
try {
|
||
const response = await axios({
|
||
method: "DELETE",
|
||
url,
|
||
headers,
|
||
timeout: 30000,
|
||
});
|
||
|
||
results.push({
|
||
status: response.status,
|
||
data: response.data,
|
||
});
|
||
} catch (error: any) {
|
||
logger.error(
|
||
`❌ API 요청 실패: ${error.response?.status || error.message}`
|
||
);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
logger.info(`✅ REST API DELETE 완료: ${results.length}건`);
|
||
|
||
return { results };
|
||
}
|
||
|
||
/**
|
||
* UPSERT 액션 노드 실행
|
||
*/
|
||
private static async executeUpsertAction(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
const { targetType } = node.data;
|
||
|
||
// 🔥 타겟 타입별 분기
|
||
switch (targetType) {
|
||
case "internal":
|
||
return this.executeInternalUpsert(node, inputData, context, client);
|
||
|
||
case "external":
|
||
return this.executeExternalUpsert(node, inputData, context);
|
||
|
||
case "api":
|
||
return this.executeApiUpsert(node, inputData, context);
|
||
|
||
default:
|
||
// 하위 호환성: targetType이 없으면 internal로 간주
|
||
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
|
||
return this.executeInternalUpsert(node, inputData, context, client);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 내부 DB UPSERT 실행 (로직 기반)
|
||
* DB 제약 조건 없이 SELECT → UPDATE or INSERT 방식으로 구현
|
||
*/
|
||
private static async executeInternalUpsert(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext,
|
||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||
): Promise<any> {
|
||
const { targetTable, fieldMappings, conflictKeys } = node.data;
|
||
|
||
if (!targetTable || !fieldMappings || fieldMappings.length === 0) {
|
||
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
|
||
}
|
||
|
||
if (!conflictKeys || conflictKeys.length === 0) {
|
||
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
|
||
}
|
||
|
||
logger.info(`🔀 UPSERT 노드 실행: ${targetTable}`);
|
||
console.log(
|
||
"📥 입력 데이터 타입:",
|
||
typeof inputData,
|
||
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
|
||
);
|
||
|
||
if (inputData && inputData.length > 0) {
|
||
console.log("📄 첫 번째 입력 데이터:");
|
||
console.log(JSON.stringify(inputData[0], null, 2));
|
||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||
}
|
||
console.log("🔑 충돌 키:", conflictKeys);
|
||
|
||
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
|
||
const executeUpsert = async (txClient: any) => {
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
let insertedCount = 0;
|
||
let updatedCount = 0;
|
||
|
||
for (const data of dataArray) {
|
||
// 1. 충돌 키 값 추출
|
||
const conflictKeyValues: Record<string, any> = {};
|
||
conflictKeys.forEach((key: string) => {
|
||
const mapping = fieldMappings.find((m: any) => m.targetField === key);
|
||
if (mapping) {
|
||
conflictKeyValues[key] =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
}
|
||
});
|
||
|
||
// 2. 존재 여부 확인 (SELECT)
|
||
const whereConditions = conflictKeys
|
||
.map((key: string, index: number) => `${key} = $${index + 1}`)
|
||
.join(" AND ");
|
||
const whereValues = conflictKeys.map(
|
||
(key: string) => conflictKeyValues[key]
|
||
);
|
||
|
||
console.log("🔍 존재 여부 확인 - WHERE 조건:", whereConditions);
|
||
console.log("🔍 존재 여부 확인 - 바인딩 값:", whereValues);
|
||
|
||
const checkSql = `SELECT 1 FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`;
|
||
const existingRow = await txClient.query(checkSql, whereValues);
|
||
|
||
if (existingRow.rows.length > 0) {
|
||
// 3-A. 존재하면 UPDATE
|
||
const setClauses: string[] = [];
|
||
const updateValues: any[] = [];
|
||
let paramIndex = 1;
|
||
|
||
fieldMappings.forEach((mapping: any) => {
|
||
// 충돌 키가 아닌 필드만 UPDATE
|
||
if (!conflictKeys.includes(mapping.targetField)) {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
|
||
updateValues.push(value);
|
||
paramIndex++;
|
||
}
|
||
});
|
||
|
||
// WHERE 조건 생성 (파라미터 인덱스 이어서)
|
||
const updateWhereConditions = conflictKeys
|
||
.map(
|
||
(key: string, index: number) => `${key} = $${paramIndex + index}`
|
||
)
|
||
.join(" AND ");
|
||
|
||
// WHERE 조건 값 추가
|
||
whereValues.forEach((val: any) => {
|
||
updateValues.push(val);
|
||
});
|
||
|
||
const updateSql = `
|
||
UPDATE ${targetTable}
|
||
SET ${setClauses.join(", ")}
|
||
WHERE ${updateWhereConditions}
|
||
`;
|
||
|
||
logger.info(`🔄 UPDATE 실행:`, {
|
||
table: targetTable,
|
||
conflictKeys,
|
||
conflictKeyValues,
|
||
sql: updateSql,
|
||
values: updateValues,
|
||
});
|
||
|
||
await txClient.query(updateSql, updateValues);
|
||
updatedCount++;
|
||
} else {
|
||
// 3-B. 없으면 INSERT
|
||
const columns: string[] = [];
|
||
const values: any[] = [];
|
||
|
||
fieldMappings.forEach((mapping: any) => {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
columns.push(mapping.targetField);
|
||
values.push(value);
|
||
});
|
||
|
||
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
|
||
const insertSql = `
|
||
INSERT INTO ${targetTable} (${columns.join(", ")})
|
||
VALUES (${placeholders})
|
||
`;
|
||
|
||
logger.info(`➕ INSERT 실행:`, {
|
||
table: targetTable,
|
||
conflictKeys,
|
||
conflictKeyValues,
|
||
});
|
||
|
||
await txClient.query(insertSql, values);
|
||
insertedCount++;
|
||
}
|
||
}
|
||
|
||
logger.info(
|
||
`✅ UPSERT 완료 (내부 DB): ${targetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}건`
|
||
);
|
||
|
||
return {
|
||
insertedCount,
|
||
updatedCount,
|
||
totalCount: insertedCount + updatedCount,
|
||
};
|
||
};
|
||
|
||
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
|
||
if (client) {
|
||
return executeUpsert(client);
|
||
} else {
|
||
return transaction(executeUpsert);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 외부 DB UPSERT 실행 (로직 기반)
|
||
*/
|
||
private static async executeExternalUpsert(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const {
|
||
externalConnectionId,
|
||
externalDbType,
|
||
externalTargetTable,
|
||
fieldMappings,
|
||
conflictKeys,
|
||
} = node.data;
|
||
|
||
if (!externalConnectionId || !externalTargetTable) {
|
||
throw new Error("외부 DB 커넥션 또는 테이블이 설정되지 않았습니다.");
|
||
}
|
||
|
||
if (!fieldMappings || fieldMappings.length === 0) {
|
||
throw new Error("UPSERT 액션에 필수 설정이 누락되었습니다.");
|
||
}
|
||
|
||
if (!conflictKeys || conflictKeys.length === 0) {
|
||
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
|
||
}
|
||
|
||
logger.info(
|
||
`🔌 외부 DB UPSERT 시작: ${externalDbType} - ${externalTargetTable}`
|
||
);
|
||
|
||
// 외부 DB 커넥터 생성
|
||
const connector = await this.createExternalConnector(
|
||
externalConnectionId,
|
||
externalDbType
|
||
);
|
||
|
||
try {
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
let insertedCount = 0;
|
||
let updatedCount = 0;
|
||
|
||
for (const data of dataArray) {
|
||
// 1. 충돌 키 값 추출
|
||
const conflictKeyValues: Record<string, any> = {};
|
||
conflictKeys.forEach((key: string) => {
|
||
const mapping = fieldMappings.find((m: any) => m.targetField === key);
|
||
if (mapping) {
|
||
conflictKeyValues[key] =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
}
|
||
});
|
||
|
||
// 2. 존재 여부 확인 (SELECT)
|
||
const whereClauses: string[] = [];
|
||
const whereValues: any[] = [];
|
||
let paramIndex = 1;
|
||
|
||
conflictKeys.forEach((key: string) => {
|
||
if (externalDbType.toLowerCase() === "oracle") {
|
||
whereClauses.push(`${key} = :${paramIndex}`);
|
||
} else if (
|
||
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
|
||
) {
|
||
whereClauses.push(`${key} = ?`);
|
||
} else if (externalDbType.toLowerCase() === "mssql") {
|
||
whereClauses.push(`${key} = @p${paramIndex}`);
|
||
} else {
|
||
whereClauses.push(`${key} = $${paramIndex}`);
|
||
}
|
||
whereValues.push(conflictKeyValues[key]);
|
||
paramIndex++;
|
||
});
|
||
|
||
const checkSql = `SELECT * FROM ${externalTargetTable} WHERE ${whereClauses.join(" AND ")} LIMIT 1`;
|
||
const existingRow = await connector.executeQuery(checkSql, whereValues);
|
||
|
||
const hasExistingRow =
|
||
existingRow.rows?.length > 0 || existingRow.length > 0;
|
||
|
||
if (hasExistingRow) {
|
||
// 3-A. 존재하면 UPDATE
|
||
const setClauses: string[] = [];
|
||
const updateValues: any[] = [];
|
||
paramIndex = 1;
|
||
|
||
fieldMappings.forEach((mapping: any) => {
|
||
if (!conflictKeys.includes(mapping.targetField)) {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
|
||
if (externalDbType.toLowerCase() === "oracle") {
|
||
setClauses.push(`${mapping.targetField} = :${paramIndex}`);
|
||
} else if (
|
||
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
|
||
) {
|
||
setClauses.push(`${mapping.targetField} = ?`);
|
||
} else if (externalDbType.toLowerCase() === "mssql") {
|
||
setClauses.push(`${mapping.targetField} = @p${paramIndex}`);
|
||
} else {
|
||
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
|
||
}
|
||
|
||
updateValues.push(value);
|
||
paramIndex++;
|
||
}
|
||
});
|
||
|
||
// WHERE 조건 생성
|
||
const updateWhereClauses: string[] = [];
|
||
conflictKeys.forEach((key: string) => {
|
||
if (externalDbType.toLowerCase() === "oracle") {
|
||
updateWhereClauses.push(`${key} = :${paramIndex}`);
|
||
} else if (
|
||
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
|
||
) {
|
||
updateWhereClauses.push(`${key} = ?`);
|
||
} else if (externalDbType.toLowerCase() === "mssql") {
|
||
updateWhereClauses.push(`${key} = @p${paramIndex}`);
|
||
} else {
|
||
updateWhereClauses.push(`${key} = $${paramIndex}`);
|
||
}
|
||
updateValues.push(conflictKeyValues[key]);
|
||
paramIndex++;
|
||
});
|
||
|
||
const updateSql = `UPDATE ${externalTargetTable} SET ${setClauses.join(", ")} WHERE ${updateWhereClauses.join(" AND ")}`;
|
||
|
||
await connector.executeQuery(updateSql, updateValues);
|
||
updatedCount++;
|
||
} else {
|
||
// 3-B. 없으면 INSERT
|
||
const columns: string[] = [];
|
||
const values: any[] = [];
|
||
|
||
fieldMappings.forEach((mapping: any) => {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
columns.push(mapping.targetField);
|
||
values.push(value);
|
||
});
|
||
|
||
let insertSql: string;
|
||
if (externalDbType.toLowerCase() === "oracle") {
|
||
const placeholders = columns.map((_, i) => `:${i + 1}`).join(", ");
|
||
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
|
||
} else if (
|
||
["mysql", "mariadb"].includes(externalDbType.toLowerCase())
|
||
) {
|
||
const placeholders = columns.map(() => "?").join(", ");
|
||
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
|
||
} else if (externalDbType.toLowerCase() === "mssql") {
|
||
const placeholders = columns.map((_, i) => `@p${i + 1}`).join(", ");
|
||
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
|
||
} else {
|
||
const placeholders = columns.map((_, i) => `$${i + 1}`).join(", ");
|
||
insertSql = `INSERT INTO ${externalTargetTable} (${columns.join(", ")}) VALUES (${placeholders})`;
|
||
}
|
||
|
||
await connector.executeQuery(insertSql, values);
|
||
insertedCount++;
|
||
}
|
||
}
|
||
|
||
// 🔥 Oracle의 경우 명시적 COMMIT
|
||
await this.commitExternalTransaction(
|
||
connector,
|
||
externalDbType,
|
||
insertedCount + updatedCount
|
||
);
|
||
|
||
logger.info(
|
||
`✅ UPSERT 완료 (외부 DB): ${externalTargetTable}, INSERT ${insertedCount}건, UPDATE ${updatedCount}건`
|
||
);
|
||
|
||
return {
|
||
insertedCount,
|
||
updatedCount,
|
||
totalCount: insertedCount + updatedCount,
|
||
};
|
||
} catch (error) {
|
||
// 🔥 Oracle의 경우 오류 시 ROLLBACK
|
||
await this.rollbackExternalTransaction(connector, externalDbType);
|
||
throw error;
|
||
} finally {
|
||
await connector.disconnect();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* REST API UPSERT 실행 (POST/PUT 요청)
|
||
* API 응답에 따라 INSERT/UPDATE 판단
|
||
*/
|
||
private static async executeApiUpsert(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const {
|
||
apiEndpoint,
|
||
apiMethod,
|
||
apiAuthType,
|
||
apiAuthConfig,
|
||
apiHeaders,
|
||
apiBodyTemplate,
|
||
fieldMappings,
|
||
} = node.data;
|
||
|
||
if (!apiEndpoint) {
|
||
throw new Error("API 엔드포인트가 설정되지 않았습니다.");
|
||
}
|
||
|
||
logger.info(`🌐 REST API UPSERT 시작: ${apiMethod} ${apiEndpoint}`);
|
||
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
const results: any[] = [];
|
||
|
||
for (const data of dataArray) {
|
||
// 헤더 설정
|
||
const headers: any = { ...apiHeaders };
|
||
|
||
// 인증 헤더 추가
|
||
if (apiAuthType === "bearer" && apiAuthConfig?.token) {
|
||
headers["Authorization"] = `Bearer ${apiAuthConfig.token}`;
|
||
} else if (
|
||
apiAuthType === "basic" &&
|
||
apiAuthConfig?.username &&
|
||
apiAuthConfig?.password
|
||
) {
|
||
const credentials = Buffer.from(
|
||
`${apiAuthConfig.username}:${apiAuthConfig.password}`
|
||
).toString("base64");
|
||
headers["Authorization"] = `Basic ${credentials}`;
|
||
} else if (apiAuthType === "apikey" && apiAuthConfig?.apiKey) {
|
||
const headerName = apiAuthConfig.apiKeyHeader || "X-API-Key";
|
||
headers[headerName] = apiAuthConfig.apiKey;
|
||
}
|
||
|
||
if (!headers["Content-Type"]) {
|
||
headers["Content-Type"] = "application/json";
|
||
}
|
||
|
||
// 바디 생성
|
||
let body: any;
|
||
|
||
if (apiBodyTemplate) {
|
||
body = this.replaceTemplateVariables(apiBodyTemplate, data);
|
||
} else if (fieldMappings && fieldMappings.length > 0) {
|
||
body = {};
|
||
fieldMappings.forEach((mapping: any) => {
|
||
const value =
|
||
mapping.staticValue !== undefined
|
||
? mapping.staticValue
|
||
: data[mapping.sourceField];
|
||
body[mapping.targetField] = value;
|
||
});
|
||
} else {
|
||
body = data;
|
||
}
|
||
|
||
try {
|
||
// UPSERT는 일반적으로 PUT 메서드 사용 (멱등성)
|
||
const response = await axios({
|
||
method: apiMethod || "PUT",
|
||
url: apiEndpoint,
|
||
headers,
|
||
data: body,
|
||
timeout: 30000,
|
||
});
|
||
|
||
results.push({
|
||
status: response.status,
|
||
data: response.data,
|
||
});
|
||
} catch (error: any) {
|
||
logger.error(
|
||
`❌ API 요청 실패: ${error.response?.status || error.message}`
|
||
);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
logger.info(`✅ REST API UPSERT 완료: ${results.length}건`);
|
||
|
||
return { results };
|
||
}
|
||
|
||
/**
|
||
* 조건 노드 실행
|
||
*/
|
||
private static async executeCondition(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any> {
|
||
const { conditions, logic } = node.data;
|
||
|
||
logger.info(
|
||
`🔍 조건 노드 실행 - inputData 타입: ${typeof inputData}, 배열 여부: ${Array.isArray(inputData)}, 길이: ${Array.isArray(inputData) ? inputData.length : "N/A"}`
|
||
);
|
||
logger.info(`🔍 조건 개수: ${conditions?.length || 0}, 로직: ${logic}`);
|
||
|
||
if (inputData) {
|
||
console.log(
|
||
"📥 조건 노드 입력 데이터:",
|
||
JSON.stringify(inputData, null, 2).substring(0, 500)
|
||
);
|
||
} else {
|
||
console.log("⚠️ 조건 노드 입력 데이터가 없습니다!");
|
||
}
|
||
|
||
// 조건이 없으면 모든 데이터 통과
|
||
if (!conditions || conditions.length === 0) {
|
||
logger.info("⚠️ 조건이 설정되지 않음 - 모든 데이터 통과");
|
||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||
return {
|
||
conditionResult: true,
|
||
trueData: dataArray,
|
||
falseData: [],
|
||
allData: dataArray,
|
||
};
|
||
}
|
||
|
||
// inputData가 배열인 경우 각 항목을 필터링
|
||
if (Array.isArray(inputData)) {
|
||
const trueData: any[] = [];
|
||
const falseData: any[] = [];
|
||
|
||
inputData.forEach((item: any) => {
|
||
const results = conditions.map((condition: any) => {
|
||
const fieldValue = item[condition.field];
|
||
|
||
let compareValue = condition.value;
|
||
if (condition.valueType === "field") {
|
||
compareValue = item[condition.value];
|
||
logger.info(
|
||
`🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})`
|
||
);
|
||
} else {
|
||
logger.info(
|
||
`📊 고정값 비교: ${condition.field} (${fieldValue}) vs ${compareValue}`
|
||
);
|
||
}
|
||
|
||
return this.evaluateCondition(
|
||
fieldValue,
|
||
condition.operator,
|
||
compareValue
|
||
);
|
||
});
|
||
|
||
const result =
|
||
logic === "OR"
|
||
? results.some((r: boolean) => r)
|
||
: results.every((r: boolean) => r);
|
||
|
||
if (result) {
|
||
trueData.push(item);
|
||
} else {
|
||
falseData.push(item);
|
||
}
|
||
});
|
||
|
||
logger.info(
|
||
`🔍 조건 필터링 결과: TRUE ${trueData.length}건 / FALSE ${falseData.length}건 (${logic} 로직)`
|
||
);
|
||
|
||
return {
|
||
conditionResult: trueData.length > 0,
|
||
trueData,
|
||
falseData,
|
||
allData: inputData,
|
||
};
|
||
}
|
||
|
||
// 단일 객체인 경우
|
||
const results = conditions.map((condition: any) => {
|
||
const fieldValue = inputData[condition.field];
|
||
|
||
let compareValue = condition.value;
|
||
if (condition.valueType === "field") {
|
||
compareValue = inputData[condition.value];
|
||
logger.info(
|
||
`🔄 필드 참조 비교: ${condition.field} (${fieldValue}) vs ${condition.value} (${compareValue})`
|
||
);
|
||
} else {
|
||
logger.info(
|
||
`📊 고정값 비교: ${condition.field} (${fieldValue}) vs ${compareValue}`
|
||
);
|
||
}
|
||
|
||
return this.evaluateCondition(
|
||
fieldValue,
|
||
condition.operator,
|
||
compareValue
|
||
);
|
||
});
|
||
|
||
const result =
|
||
logic === "OR"
|
||
? results.some((r: boolean) => r)
|
||
: results.every((r: boolean) => r);
|
||
|
||
logger.info(`🔍 조건 평가 결과: ${result} (${logic} 로직)`);
|
||
|
||
// ⚠️ 조건 노드는 TRUE/FALSE 브랜치를 위한 특별한 처리 필요
|
||
// 조건 결과를 저장하고, 원본 데이터는 항상 반환
|
||
// 다음 노드에서 sourceHandle을 기반으로 필터링됨
|
||
return {
|
||
conditionResult: result,
|
||
trueData: result ? [inputData] : [],
|
||
falseData: result ? [] : [inputData],
|
||
allData: [inputData], // 일단 모든 데이터 전달
|
||
};
|
||
}
|
||
|
||
/**
|
||
* WHERE 절 생성
|
||
*/
|
||
/**
|
||
* 테이블의 Primary Key 컬럼 조회 (내부 DB - PostgreSQL)
|
||
*/
|
||
private static async getPrimaryKeyColumns(
|
||
tableName: string,
|
||
schema: string = "public"
|
||
): Promise<string[]> {
|
||
const sql = `
|
||
SELECT a.attname AS column_name
|
||
FROM pg_index i
|
||
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
|
||
WHERE i.indrelid = $1::regclass
|
||
AND i.indisprimary
|
||
ORDER BY array_position(i.indkey, a.attnum);
|
||
`;
|
||
|
||
const fullTableName = schema ? `${schema}.${tableName}` : tableName;
|
||
|
||
try {
|
||
const result = await query(sql, [fullTableName]);
|
||
const pkColumns = result.map((row: any) => row.column_name);
|
||
|
||
if (pkColumns.length > 0) {
|
||
console.log(`🔑 테이블 ${tableName}의 Primary Key: ${pkColumns.join(", ")}`);
|
||
} else {
|
||
console.log(`⚠️ 테이블 ${tableName}에 Primary Key가 없습니다`);
|
||
}
|
||
|
||
return pkColumns;
|
||
} catch (error) {
|
||
console.error(`❌ Primary Key 조회 실패 (${tableName}):`, error);
|
||
return [];
|
||
}
|
||
}
|
||
|
||
/**
|
||
* WHERE 조건에 Primary Key 자동 추가 (컨텍스트 데이터 사용 시)
|
||
*
|
||
* 테이블의 실제 Primary Key를 자동으로 감지하여 WHERE 조건에 추가
|
||
*/
|
||
private static async enhanceWhereConditionsWithPK(
|
||
whereConditions: any[],
|
||
data: any,
|
||
tableName: string,
|
||
schema: string = "public"
|
||
): Promise<any[]> {
|
||
if (!data) {
|
||
console.log("⚠️ 입력 데이터가 없어 WHERE 조건 자동 추가 불가");
|
||
return whereConditions || [];
|
||
}
|
||
|
||
// 🔑 테이블의 실제 Primary Key 컬럼 조회
|
||
const pkColumns = await this.getPrimaryKeyColumns(tableName, schema);
|
||
|
||
if (pkColumns.length === 0) {
|
||
console.log(`⚠️ 테이블 ${tableName}에 Primary Key가 없어 자동 추가 불가`);
|
||
return whereConditions || [];
|
||
}
|
||
|
||
// 🔍 데이터에 모든 PK 컬럼이 있는지 확인
|
||
const missingPKColumns = pkColumns.filter(col =>
|
||
data[col] === undefined || data[col] === null
|
||
);
|
||
|
||
if (missingPKColumns.length > 0) {
|
||
console.log(
|
||
`⚠️ 입력 데이터에 Primary Key 컬럼이 없어 자동 추가 불가: ${missingPKColumns.join(", ")}`
|
||
);
|
||
return whereConditions || [];
|
||
}
|
||
|
||
// 🔍 이미 WHERE 조건에 모든 PK가 포함되어 있는지 확인
|
||
const existingFields = new Set(
|
||
(whereConditions || []).map((cond: any) => cond.field)
|
||
);
|
||
const allPKsExist = pkColumns.every(col =>
|
||
existingFields.has(col) || existingFields.has(`${tableName}.${col}`)
|
||
);
|
||
|
||
if (allPKsExist) {
|
||
console.log("✅ WHERE 조건에 이미 모든 Primary Key 포함, 추가하지 않음");
|
||
return whereConditions || [];
|
||
}
|
||
|
||
// 🔥 Primary Key 조건들을 맨 앞에 추가
|
||
const pkConditions = pkColumns.map(col => ({
|
||
field: col,
|
||
operator: 'EQUALS',
|
||
value: data[col]
|
||
}));
|
||
|
||
const enhanced = [...pkConditions, ...(whereConditions || [])];
|
||
|
||
const pkValues = pkColumns.map(col => `${col} = ${data[col]}`).join(", ");
|
||
console.log(`🔑 WHERE 조건에 Primary Key 자동 추가: ${pkValues}`);
|
||
|
||
return enhanced;
|
||
}
|
||
|
||
private static buildWhereClause(
|
||
conditions: any[],
|
||
data?: any,
|
||
startIndex: number = 1
|
||
): { clause: string; values: any[] } {
|
||
if (!conditions || conditions.length === 0) {
|
||
return { clause: "", values: [] };
|
||
}
|
||
|
||
const values: any[] = [];
|
||
const clauses = conditions.map((condition, index) => {
|
||
const value = data ? data[condition.field] : condition.value;
|
||
values.push(value);
|
||
|
||
// 연산자를 SQL 문법으로 변환
|
||
let sqlOperator = condition.operator;
|
||
switch (condition.operator.toUpperCase()) {
|
||
case "EQUALS":
|
||
sqlOperator = "=";
|
||
break;
|
||
case "NOT_EQUALS":
|
||
case "NOTEQUALS":
|
||
sqlOperator = "!=";
|
||
break;
|
||
case "GREATER_THAN":
|
||
case "GREATERTHAN":
|
||
sqlOperator = ">";
|
||
break;
|
||
case "LESS_THAN":
|
||
case "LESSTHAN":
|
||
sqlOperator = "<";
|
||
break;
|
||
case "GREATER_THAN_OR_EQUAL":
|
||
case "GREATERTHANOREQUAL":
|
||
sqlOperator = ">=";
|
||
break;
|
||
case "LESS_THAN_OR_EQUAL":
|
||
case "LESSTHANOREQUAL":
|
||
sqlOperator = "<=";
|
||
break;
|
||
case "LIKE":
|
||
sqlOperator = "LIKE";
|
||
break;
|
||
case "NOT_LIKE":
|
||
case "NOTLIKE":
|
||
sqlOperator = "NOT LIKE";
|
||
break;
|
||
case "IN":
|
||
sqlOperator = "IN";
|
||
break;
|
||
case "NOT_IN":
|
||
case "NOTIN":
|
||
sqlOperator = "NOT IN";
|
||
break;
|
||
case "IS_NULL":
|
||
case "ISNULL":
|
||
return `${condition.field} IS NULL`;
|
||
case "IS_NOT_NULL":
|
||
case "ISNOTNULL":
|
||
return `${condition.field} IS NOT NULL`;
|
||
default:
|
||
// 이미 SQL 문법인 경우 (=, !=, >, < 등)
|
||
sqlOperator = condition.operator;
|
||
}
|
||
|
||
return `${condition.field} ${sqlOperator} $${startIndex + index}`;
|
||
});
|
||
|
||
return { clause: `WHERE ${clauses.join(" AND ")}`, values };
|
||
}
|
||
|
||
/**
|
||
* 조건 평가
|
||
*/
|
||
private static evaluateCondition(
|
||
fieldValue: any,
|
||
operator: string,
|
||
expectedValue: any
|
||
): boolean {
|
||
// NULL 체크
|
||
if (operator === "IS_NULL" || operator === "isNull") {
|
||
return (
|
||
fieldValue === null || fieldValue === undefined || fieldValue === ""
|
||
);
|
||
}
|
||
if (operator === "IS_NOT_NULL" || operator === "isNotNull") {
|
||
return (
|
||
fieldValue !== null && fieldValue !== undefined && fieldValue !== ""
|
||
);
|
||
}
|
||
|
||
// 비교 연산자: 타입 변환
|
||
const normalizedOperator = operator.toUpperCase();
|
||
|
||
switch (normalizedOperator) {
|
||
case "EQUALS":
|
||
case "=":
|
||
return fieldValue == expectedValue; // 느슨한 비교
|
||
|
||
case "NOT_EQUALS":
|
||
case "NOTEQUALS":
|
||
case "!=":
|
||
return fieldValue != expectedValue;
|
||
|
||
case "GREATER_THAN":
|
||
case "GREATERTHAN":
|
||
case ">":
|
||
return Number(fieldValue) > Number(expectedValue);
|
||
|
||
case "LESS_THAN":
|
||
case "LESSTHAN":
|
||
case "<":
|
||
return Number(fieldValue) < Number(expectedValue);
|
||
|
||
case "GREATER_THAN_OR_EQUAL":
|
||
case "GREATERTHANOREQUAL":
|
||
case ">=":
|
||
return Number(fieldValue) >= Number(expectedValue);
|
||
|
||
case "LESS_THAN_OR_EQUAL":
|
||
case "LESSTHANOREQUAL":
|
||
case "<=":
|
||
return Number(fieldValue) <= Number(expectedValue);
|
||
|
||
case "LIKE":
|
||
case "CONTAINS":
|
||
return String(fieldValue)
|
||
.toLowerCase()
|
||
.includes(String(expectedValue).toLowerCase());
|
||
|
||
case "NOT_LIKE":
|
||
case "NOTLIKE":
|
||
return !String(fieldValue)
|
||
.toLowerCase()
|
||
.includes(String(expectedValue).toLowerCase());
|
||
|
||
case "IN":
|
||
if (Array.isArray(expectedValue)) {
|
||
return expectedValue.includes(fieldValue);
|
||
}
|
||
// 쉼표로 구분된 문자열
|
||
const inValues = String(expectedValue)
|
||
.split(",")
|
||
.map((v) => v.trim());
|
||
return inValues.includes(String(fieldValue));
|
||
|
||
case "NOT_IN":
|
||
case "NOTIN":
|
||
if (Array.isArray(expectedValue)) {
|
||
return !expectedValue.includes(fieldValue);
|
||
}
|
||
const notInValues = String(expectedValue)
|
||
.split(",")
|
||
.map((v) => v.trim());
|
||
return !notInValues.includes(String(fieldValue));
|
||
|
||
default:
|
||
logger.warn(`⚠️ 지원되지 않는 연산자: ${operator}`);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 실행 결과 생성
|
||
*/
|
||
private static generateExecutionResult(
|
||
nodes: FlowNode[],
|
||
context: ExecutionContext,
|
||
executionTime: number
|
||
): ExecutionResult {
|
||
const nodeSummaries: NodeExecutionSummary[] = nodes.map((node) => {
|
||
const result = context.nodeResults.get(node.id);
|
||
return {
|
||
nodeId: node.id,
|
||
nodeName: node.data.displayName || node.id,
|
||
nodeType: node.type,
|
||
status: result?.status || "pending",
|
||
duration: result?.endTime
|
||
? result.endTime - result.startTime
|
||
: undefined,
|
||
error: result?.error?.message,
|
||
};
|
||
});
|
||
|
||
const summary = {
|
||
total: nodes.length,
|
||
success: nodeSummaries.filter((n) => n.status === "success").length,
|
||
failed: nodeSummaries.filter((n) => n.status === "failed").length,
|
||
skipped: nodeSummaries.filter((n) => n.status === "skipped").length,
|
||
};
|
||
|
||
const success = summary.failed === 0;
|
||
|
||
// 실패한 노드 상세 로깅
|
||
if (!success) {
|
||
const failedNodes = nodeSummaries.filter((n) => n.status === "failed");
|
||
logger.error(
|
||
`❌ 실패한 노드들:`,
|
||
failedNodes.map((n) => ({
|
||
nodeId: n.nodeId,
|
||
nodeName: n.nodeName,
|
||
nodeType: n.nodeType,
|
||
error: n.error,
|
||
}))
|
||
);
|
||
}
|
||
|
||
return {
|
||
success,
|
||
message: success
|
||
? `플로우 실행 성공 (${summary.success}/${summary.total})`
|
||
: `플로우 실행 부분 실패 (성공: ${summary.success}, 실패: ${summary.failed})`,
|
||
executionTime,
|
||
nodes: nodeSummaries,
|
||
summary,
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 데이터 변환 노드 실행
|
||
*/
|
||
private static async executeDataTransform(
|
||
node: FlowNode,
|
||
inputData: any,
|
||
context: ExecutionContext
|
||
): Promise<any[]> {
|
||
const { transformations } = node.data;
|
||
|
||
if (
|
||
!transformations ||
|
||
!Array.isArray(transformations) ||
|
||
transformations.length === 0
|
||
) {
|
||
logger.warn(`⚠️ 데이터 변환 노드에 변환 규칙이 없습니다: ${node.id}`);
|
||
return Array.isArray(inputData) ? inputData : [inputData];
|
||
}
|
||
|
||
// inputData를 배열로 정규화
|
||
const rows = Array.isArray(inputData) ? inputData : [inputData];
|
||
logger.info(
|
||
`🔄 데이터 변환 시작: ${rows.length}개 행, ${transformations.length}개 변환`
|
||
);
|
||
|
||
// 각 변환 규칙을 순차적으로 적용
|
||
let transformedRows = rows;
|
||
|
||
for (const transform of transformations) {
|
||
const transformType = transform.type;
|
||
logger.info(` 🔹 변환 적용: ${transformType}`);
|
||
|
||
transformedRows = this.applyTransformation(transformedRows, transform);
|
||
}
|
||
|
||
logger.info(`✅ 데이터 변환 완료: ${transformedRows.length}개 행`);
|
||
return transformedRows;
|
||
}
|
||
|
||
/**
|
||
* 단일 변환 규칙 적용
|
||
*/
|
||
private static applyTransformation(rows: any[], transform: any): any[] {
|
||
const {
|
||
type,
|
||
sourceField,
|
||
targetField,
|
||
delimiter,
|
||
separator,
|
||
searchValue,
|
||
replaceValue,
|
||
splitIndex,
|
||
castType,
|
||
expression,
|
||
} = transform;
|
||
|
||
// 타겟 필드 결정 (비어있으면 소스 필드 사용 = in-place)
|
||
const actualTargetField = targetField || sourceField;
|
||
|
||
switch (type) {
|
||
case "UPPERCASE":
|
||
return rows.map((row) => ({
|
||
...row,
|
||
[actualTargetField]:
|
||
row[sourceField]?.toString().toUpperCase() || row[sourceField],
|
||
}));
|
||
|
||
case "LOWERCASE":
|
||
return rows.map((row) => ({
|
||
...row,
|
||
[actualTargetField]:
|
||
row[sourceField]?.toString().toLowerCase() || row[sourceField],
|
||
}));
|
||
|
||
case "TRIM":
|
||
return rows.map((row) => ({
|
||
...row,
|
||
[actualTargetField]:
|
||
row[sourceField]?.toString().trim() || row[sourceField],
|
||
}));
|
||
|
||
case "EXPLODE":
|
||
return this.applyExplode(
|
||
rows,
|
||
sourceField,
|
||
actualTargetField,
|
||
delimiter || ","
|
||
);
|
||
|
||
case "CONCAT":
|
||
return rows.map((row) => {
|
||
const value1 = row[sourceField] || "";
|
||
// CONCAT은 여러 필드를 합칠 수 있지만, 단순화하여 expression 사용
|
||
const value2 = expression || "";
|
||
return {
|
||
...row,
|
||
[actualTargetField]: `${value1}${separator || ""}${value2}`,
|
||
};
|
||
});
|
||
|
||
case "SPLIT":
|
||
return rows.map((row) => {
|
||
const value = row[sourceField]?.toString() || "";
|
||
const parts = value.split(delimiter || ",");
|
||
const index = splitIndex !== undefined ? splitIndex : 0;
|
||
return {
|
||
...row,
|
||
[actualTargetField]: parts[index] || "",
|
||
};
|
||
});
|
||
|
||
case "REPLACE":
|
||
return rows.map((row) => {
|
||
const value = row[sourceField]?.toString() || "";
|
||
const replaced = value.replace(
|
||
new RegExp(searchValue || "", "g"),
|
||
replaceValue || ""
|
||
);
|
||
return {
|
||
...row,
|
||
[actualTargetField]: replaced,
|
||
};
|
||
});
|
||
|
||
case "CAST":
|
||
return rows.map((row) => {
|
||
const value = row[sourceField];
|
||
let castedValue = value;
|
||
|
||
switch (castType) {
|
||
case "string":
|
||
castedValue = value?.toString() || "";
|
||
break;
|
||
case "number":
|
||
castedValue = parseFloat(value) || 0;
|
||
break;
|
||
case "boolean":
|
||
castedValue = Boolean(value);
|
||
break;
|
||
case "date":
|
||
castedValue = new Date(value);
|
||
break;
|
||
}
|
||
|
||
return {
|
||
...row,
|
||
[actualTargetField]: castedValue,
|
||
};
|
||
});
|
||
|
||
case "FORMAT":
|
||
case "CALCULATE":
|
||
case "JSON_EXTRACT":
|
||
case "CUSTOM":
|
||
// 표현식 기반 변환 (현재는 단순 구현)
|
||
logger.warn(`⚠️ ${type} 변환은 아직 완전히 구현되지 않았습니다.`);
|
||
return rows.map((row) => ({
|
||
...row,
|
||
[actualTargetField]: row[sourceField] || "",
|
||
}));
|
||
|
||
default:
|
||
logger.warn(`⚠️ 지원하지 않는 변환 타입: ${type}`);
|
||
return rows;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* EXPLODE 변환: 1개 행을 N개 행으로 확장
|
||
*/
|
||
private static applyExplode(
|
||
rows: any[],
|
||
sourceField: string,
|
||
targetField: string,
|
||
delimiter: string
|
||
): any[] {
|
||
const expandedRows: any[] = [];
|
||
|
||
for (const row of rows) {
|
||
const value = row[sourceField];
|
||
|
||
if (!value) {
|
||
// 값이 없으면 원본 행 유지
|
||
expandedRows.push(row);
|
||
continue;
|
||
}
|
||
|
||
// 문자열을 구분자로 분리
|
||
const values = value
|
||
.toString()
|
||
.split(delimiter)
|
||
.map((v: string) => v.trim());
|
||
|
||
// 각 값마다 새 행 생성
|
||
for (const val of values) {
|
||
expandedRows.push({
|
||
...row, // 다른 필드들은 복제
|
||
[targetField]: val, // 타겟 필드에 분리된 값 저장
|
||
});
|
||
}
|
||
}
|
||
|
||
logger.info(
|
||
` 💥 EXPLODE: ${rows.length}개 행 → ${expandedRows.length}개 행`
|
||
);
|
||
return expandedRows;
|
||
}
|
||
|
||
/**
|
||
* 🔥 액션 노드 여부 확인
|
||
*/
|
||
private static isActionNode(nodeType: NodeType): boolean {
|
||
return [
|
||
"insertAction",
|
||
"updateAction",
|
||
"deleteAction",
|
||
"upsertAction",
|
||
].includes(nodeType);
|
||
}
|
||
}
|