ERP-node/docs/노드_실행_엔진_설계.md

618 lines
12 KiB
Markdown
Raw Permalink Normal View History

feat: 노드 기반 데이터 플로우 시스템 구현 - 노드 에디터 UI 구현 (React Flow 기반) - TableSource, DataTransform, INSERT, UPDATE, DELETE, UPSERT 노드 - 드래그앤드롭 노드 추가 및 연결 - 속성 패널을 통한 노드 설정 - 실시간 필드 라벨 표시 (column_labels 테이블 연동) - 데이터 변환 노드 (DataTransform) 기능 - EXPLODE: 구분자로 1개 행 → 여러 행 확장 - UPPERCASE, LOWERCASE, TRIM, CONCAT, SPLIT, REPLACE 등 12가지 변환 타입 - In-place 변환 지원 (타겟 필드 생략 시 소스 필드 덮어쓰기) - 변환된 필드가 하위 액션 노드에 자동 전달 - 노드 플로우 실행 엔진 - 위상 정렬을 통한 노드 실행 순서 결정 - 레벨별 병렬 실행 (Promise.allSettled) - 부분 실패 허용 (한 노드 실패 시 연결된 하위 노드만 스킵) - 트랜잭션 기반 안전한 데이터 처리 - UPSERT 액션 로직 구현 - DB 제약 조건 없이 SELECT → UPDATE or INSERT 방식 - 복합 충돌 키 지원 (예: sales_no + product_name) - 파라미터 인덱스 정확한 매핑 - 데이터 소스 자동 감지 - 테이블 선택 데이터 (selectedRowsData) 자동 주입 - 폼 입력 데이터 (formData) 자동 주입 - TableSource 노드가 외부 데이터 우선 사용 - 버튼 컴포넌트 통합 - 기존 관계 실행 + 새 노드 플로우 실행 하이브리드 지원 - 노드 플로우 선택 UI 추가 - API 클라이언트 통합 (Axios) - 개발 문서 작성 - 노드 기반 제어 시스템 개선 계획 - 노드 연결 규칙 설계 - 노드 실행 엔진 설계 - 노드 구조 개선안 - 버튼 통합 분석
2025-10-02 16:22:29 +09:00
# 노드 실행 엔진 설계
**작성일**: 2025-01-02
**버전**: 1.0
**상태**: ✅ 확정
---
## 📋 목차
1. [개요](#개요)
2. [실행 방식](#실행-방식)
3. [데이터 흐름](#데이터-흐름)
4. [오류 처리](#오류-처리)
5. [구현 계획](#구현-계획)
---
## 개요
### 목적
노드 기반 데이터 플로우의 실행 엔진을 설계하여:
- 효율적인 병렬 처리
- 안정적인 오류 처리
- 명확한 데이터 흐름
### 핵심 원칙
1. **독립적 트랜잭션**: 각 액션 노드는 독립적인 트랜잭션
2. **부분 실패 허용**: 일부 실패해도 성공한 노드는 커밋
3. **연쇄 중단**: 부모 노드 실패 시 자식 노드 스킵
4. **병렬 실행**: 의존성 없는 노드는 병렬 실행
---
## 실행 방식
### 1. 기본 구조
```typescript
interface ExecutionContext {
sourceData: any[]; // 원본 데이터
nodeResults: Map<string, NodeResult>; // 각 노드 실행 결과
executionOrder: string[]; // 실행 순서
}
interface NodeResult {
nodeId: string;
status: "pending" | "success" | "failed" | "skipped";
data?: any;
error?: Error;
startTime: number;
endTime?: number;
}
```
---
### 2. 실행 단계
#### Step 1: 위상 정렬 (Topological Sort)
노드 간 의존성을 파악하여 실행 순서 결정
```typescript
function topologicalSort(nodes: FlowNode[], edges: FlowEdge[]): string[][] {
// DAG(Directed Acyclic Graph) 순회
// 같은 레벨의 노드들은 배열로 그룹화
return [
["tableSource-1"], // Level 0: 소스
["insert-1", "update-1", "delete-1"], // Level 1: 병렬 실행 가능
["update-2"], // Level 2: insert-1에 의존
];
}
```
#### Step 2: 레벨별 실행
```typescript
async function executeFlow(
nodes: FlowNode[],
edges: FlowEdge[]
): Promise<ExecutionResult> {
const levels = topologicalSort(nodes, edges);
const context: ExecutionContext = {
sourceData: [],
nodeResults: new Map(),
executionOrder: [],
};
for (const level of levels) {
// 같은 레벨의 노드들은 병렬 실행
await executeLevel(level, nodes, context);
}
return generateExecutionReport(context);
}
```
#### Step 3: 레벨 내 병렬 실행
```typescript
async function executeLevel(
nodeIds: string[],
nodes: FlowNode[],
context: ExecutionContext
): Promise<void> {
// Promise.allSettled로 병렬 실행
const results = await Promise.allSettled(
nodeIds.map((nodeId) => executeNode(nodeId, nodes, context))
);
// 결과 저장
results.forEach((result, index) => {
const nodeId = nodeIds[index];
if (result.status === "fulfilled") {
context.nodeResults.set(nodeId, result.value);
} else {
context.nodeResults.set(nodeId, {
nodeId,
status: "failed",
error: result.reason,
startTime: Date.now(),
endTime: Date.now(),
});
}
});
}
```
---
## 데이터 흐름
### 1. 소스 노드 실행
```typescript
async function executeSourceNode(node: TableSourceNode): Promise<any[]> {
const { tableName, schema, whereConditions } = node.data;
// 데이터베이스 쿼리 실행
const query = buildSelectQuery(tableName, schema, whereConditions);
const data = await executeQuery(query);
return data;
}
```
**결과 예시**:
```json
[
{ "id": 1, "name": "김철수", "age": 30 },
{ "id": 2, "name": "이영희", "age": 25 },
{ "id": 3, "name": "박민수", "age": 35 }
]
```
---
### 2. 액션 노드 실행
#### 데이터 전달 방식
```typescript
async function executeNode(
nodeId: string,
nodes: FlowNode[],
context: ExecutionContext
): Promise<NodeResult> {
const node = nodes.find((n) => n.id === nodeId);
const parents = getParentNodes(nodeId, edges);
// 1⃣ 부모 노드 상태 확인
const parentFailed = parents.some((p) => {
const parentResult = context.nodeResults.get(p.id);
return parentResult?.status === "failed";
});
if (parentFailed) {
return {
nodeId,
status: "skipped",
error: new Error("Parent node failed"),
startTime: Date.now(),
endTime: Date.now(),
};
}
// 2⃣ 입력 데이터 준비
const inputData = prepareInputData(node, parents, context);
// 3⃣ 액션 실행 (독립 트랜잭션)
return await executeActionWithTransaction(node, inputData);
}
```
#### 입력 데이터 준비
```typescript
function prepareInputData(
node: FlowNode,
parents: FlowNode[],
context: ExecutionContext
): any {
if (parents.length === 0) {
// 소스 노드
return null;
} else if (parents.length === 1) {
// 단일 부모: 부모의 결과 데이터 전달
const parentResult = context.nodeResults.get(parents[0].id);
return parentResult?.data || context.sourceData;
} else {
// 다중 부모: 모든 부모의 데이터 병합
return parents.map((p) => {
const result = context.nodeResults.get(p.id);
return result?.data || context.sourceData;
});
}
}
```
---
### 3. 병렬 실행 예시
```
TableSource
(100개 레코드)
┌──────┼──────┐
↓ ↓ ↓
INSERT UPDATE DELETE
(독립) (독립) (독립)
```
**실행 과정**:
```typescript
// 1. TableSource 실행
const sourceData = await executeTableSource();
// → [100개 레코드]
// 2. 병렬 실행 (Promise.allSettled)
const results = await Promise.allSettled([
executeInsertAction(insertNode, sourceData),
executeUpdateAction(updateNode, sourceData),
executeDeleteAction(deleteNode, sourceData),
]);
// 3. 각 액션은 독립 트랜잭션
// - INSERT 실패 → INSERT만 롤백
// - UPDATE 성공 → UPDATE 커밋
// - DELETE 성공 → DELETE 커밋
```
---
### 4. 연쇄 실행 예시
```
TableSource
INSERT
❌ (실패)
UPDATE-2
⏭️ (스킵)
```
**실행 과정**:
```typescript
// 1. TableSource 실행
const sourceData = await executeTableSource();
// → 성공 ✅
// 2. INSERT 실행
const insertResult = await executeInsertAction(insertNode, sourceData);
// → 실패 ❌ (롤백됨)
// 3. UPDATE-2 실행 시도
const parentFailed = insertResult.status === "failed";
if (parentFailed) {
return {
status: "skipped",
reason: "Parent INSERT failed",
};
// → 스킬 ⏭️
}
```
---
## 오류 처리
### 1. 독립 트랜잭션
각 액션 노드는 자체 트랜잭션을 가짐
```typescript
async function executeActionWithTransaction(
node: FlowNode,
inputData: any
): Promise<NodeResult> {
// 트랜잭션 시작
const transaction = await db.beginTransaction();
try {
const result = await performAction(node, inputData, transaction);
// 성공 시 커밋
await transaction.commit();
return {
nodeId: node.id,
status: "success",
data: result,
startTime: Date.now(),
endTime: Date.now(),
};
} catch (error) {
// 실패 시 롤백
await transaction.rollback();
return {
nodeId: node.id,
status: "failed",
error: error,
startTime: Date.now(),
endTime: Date.now(),
};
}
}
```
---
### 2. 부분 실패 허용
```typescript
// Promise.allSettled 사용
const results = await Promise.allSettled([action1(), action2(), action3()]);
// 결과 수집
const summary = {
total: results.length,
success: results.filter((r) => r.status === "fulfilled").length,
failed: results.filter((r) => r.status === "rejected").length,
details: results,
};
```
**예시 결과**:
```json
{
"total": 3,
"success": 2,
"failed": 1,
"details": [
{ "status": "rejected", "reason": "Duplicate key error" },
{ "status": "fulfilled", "value": { "updatedCount": 100 } },
{ "status": "fulfilled", "value": { "deletedCount": 50 } }
]
}
```
---
### 3. 연쇄 중단
부모 노드 실패 시 자식 노드 자동 스킵
```typescript
function shouldSkipNode(node: FlowNode, context: ExecutionContext): boolean {
const parents = getParentNodes(node.id);
return parents.some((parent) => {
const parentResult = context.nodeResults.get(parent.id);
return parentResult?.status === "failed";
});
}
```
---
### 4. 오류 메시지
```typescript
interface ExecutionError {
nodeId: string;
nodeName: string;
errorType: "validation" | "execution" | "connection" | "timeout";
message: string;
details?: any;
timestamp: number;
}
```
**오류 메시지 예시**:
```json
{
"nodeId": "insert-1",
"nodeName": "INSERT 액션",
"errorType": "execution",
"message": "Duplicate key error: 'email' already exists",
"details": {
"table": "users",
"constraint": "users_email_unique",
"value": "test@example.com"
},
"timestamp": 1704182400000
}
```
---
## 구현 계획
### Phase 1: 기본 실행 엔진 (우선순위: 높음)
**작업 항목**:
1. ✅ 위상 정렬 알고리즘 구현
2. ✅ 레벨별 실행 로직
3. ✅ Promise.allSettled 기반 병렬 실행
4. ✅ 독립 트랜잭션 처리
5. ✅ 연쇄 중단 로직
**예상 시간**: 1일
---
### Phase 2: 소스 노드 실행 (우선순위: 높음)
**작업 항목**:
1. ✅ TableSource 실행기
2. ✅ ExternalDBSource 실행기
3. ✅ RestAPISource 실행기
4. ✅ 데이터 캐싱
**예상 시간**: 1일
---
### Phase 3: 액션 노드 실행 (우선순위: 높음)
**작업 항목**:
1. ✅ INSERT 액션 실행기
2. ✅ UPDATE 액션 실행기
3. ✅ DELETE 액션 실행기
4. ✅ UPSERT 액션 실행기
5. ✅ 필드 매핑 적용
**예상 시간**: 2일
---
### Phase 4: 변환 노드 실행 (우선순위: 중간)
**작업 항목**:
1. ✅ FieldMapping 실행기
2. ✅ DataTransform 실행기
3. ✅ Condition 분기 처리
**예상 시간**: 1일
---
### Phase 5: 오류 처리 및 모니터링 (우선순위: 중간)
**작업 항목**:
1. ✅ 상세 오류 메시지
2. ✅ 실행 결과 리포트
3. ✅ 실행 로그 저장
4. ✅ 실시간 진행 상태 표시
**예상 시간**: 1일
---
### Phase 6: 최적화 (우선순위: 낮음)
**작업 항목**:
1. ⏳ 데이터 스트리밍 (대용량 데이터)
2. ⏳ 배치 처리 최적화
3. ⏳ 병렬 처리 튜닝
4. ⏳ 캐싱 전략
**예상 시간**: 2일
---
## 실행 결과 예시
### 성공 케이스
```json
{
"flowId": "flow-123",
"flowName": "사용자 데이터 동기화",
"status": "completed",
"startTime": "2025-01-02T10:00:00Z",
"endTime": "2025-01-02T10:00:05Z",
"duration": 5000,
"nodes": [
{
"nodeId": "source-1",
"nodeName": "TableSource",
"status": "success",
"recordCount": 100,
"duration": 500
},
{
"nodeId": "insert-1",
"nodeName": "INSERT",
"status": "success",
"insertedCount": 100,
"duration": 2000
},
{
"nodeId": "update-1",
"nodeName": "UPDATE",
"status": "success",
"updatedCount": 80,
"duration": 1500
}
],
"summary": {
"total": 3,
"success": 3,
"failed": 0,
"skipped": 0
}
}
```
---
### 부분 실패 케이스
```json
{
"flowId": "flow-124",
"flowName": "데이터 처리",
"status": "partial_success",
"startTime": "2025-01-02T11:00:00Z",
"endTime": "2025-01-02T11:00:08Z",
"duration": 8000,
"nodes": [
{
"nodeId": "source-1",
"nodeName": "TableSource",
"status": "success",
"recordCount": 100
},
{
"nodeId": "insert-1",
"nodeName": "INSERT",
"status": "failed",
"error": "Duplicate key error",
"details": "email 'test@example.com' already exists"
},
{
"nodeId": "update-2",
"nodeName": "UPDATE-2",
"status": "skipped",
"reason": "Parent INSERT failed"
},
{
"nodeId": "update-1",
"nodeName": "UPDATE",
"status": "success",
"updatedCount": 50
},
{
"nodeId": "delete-1",
"nodeName": "DELETE",
"status": "success",
"deletedCount": 20
}
],
"summary": {
"total": 5,
"success": 3,
"failed": 1,
"skipped": 1
}
}
```
---
## 다음 단계
1. ✅ 데이터 처리 방식 확정 (완료)
2. ⏳ 실행 엔진 구현 시작
3. ⏳ 테스트 케이스 작성
4. ⏳ UI에서 실행 결과 표시
---
**참고 문서**:
- [노드*기반*제어*시스템*개선\_계획.md](./노드_기반_제어_시스템_개선_계획.md)
- [노드*연결*규칙\_설계.md](./노드_연결_규칙_설계.md)
- [노드*구조*개선안.md](./노드_구조_개선안.md)