ERP-node/docs/노드_실행_엔진_설계.md

618 lines
12 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 노드 실행 엔진 설계
**작성일**: 2025-01-02
**버전**: 1.0
**상태**: ✅ 확정
---
## 📋 목차
1. [개요](#개요)
2. [실행 방식](#실행-방식)
3. [데이터 흐름](#데이터-흐름)
4. [오류 처리](#오류-처리)
5. [구현 계획](#구현-계획)
---
## 개요
### 목적
노드 기반 데이터 플로우의 실행 엔진을 설계하여:
- 효율적인 병렬 처리
- 안정적인 오류 처리
- 명확한 데이터 흐름
### 핵심 원칙
1. **독립적 트랜잭션**: 각 액션 노드는 독립적인 트랜잭션
2. **부분 실패 허용**: 일부 실패해도 성공한 노드는 커밋
3. **연쇄 중단**: 부모 노드 실패 시 자식 노드 스킵
4. **병렬 실행**: 의존성 없는 노드는 병렬 실행
---
## 실행 방식
### 1. 기본 구조
```typescript
interface ExecutionContext {
sourceData: any[]; // 원본 데이터
nodeResults: Map<string, NodeResult>; // 각 노드 실행 결과
executionOrder: string[]; // 실행 순서
}
interface NodeResult {
nodeId: string;
status: "pending" | "success" | "failed" | "skipped";
data?: any;
error?: Error;
startTime: number;
endTime?: number;
}
```
---
### 2. 실행 단계
#### Step 1: 위상 정렬 (Topological Sort)
노드 간 의존성을 파악하여 실행 순서 결정
```typescript
function topologicalSort(nodes: FlowNode[], edges: FlowEdge[]): string[][] {
// DAG(Directed Acyclic Graph) 순회
// 같은 레벨의 노드들은 배열로 그룹화
return [
["tableSource-1"], // Level 0: 소스
["insert-1", "update-1", "delete-1"], // Level 1: 병렬 실행 가능
["update-2"], // Level 2: insert-1에 의존
];
}
```
#### Step 2: 레벨별 실행
```typescript
async function executeFlow(
nodes: FlowNode[],
edges: FlowEdge[]
): Promise<ExecutionResult> {
const levels = topologicalSort(nodes, edges);
const context: ExecutionContext = {
sourceData: [],
nodeResults: new Map(),
executionOrder: [],
};
for (const level of levels) {
// 같은 레벨의 노드들은 병렬 실행
await executeLevel(level, nodes, context);
}
return generateExecutionReport(context);
}
```
#### Step 3: 레벨 내 병렬 실행
```typescript
async function executeLevel(
nodeIds: string[],
nodes: FlowNode[],
context: ExecutionContext
): Promise<void> {
// Promise.allSettled로 병렬 실행
const results = await Promise.allSettled(
nodeIds.map((nodeId) => executeNode(nodeId, nodes, context))
);
// 결과 저장
results.forEach((result, index) => {
const nodeId = nodeIds[index];
if (result.status === "fulfilled") {
context.nodeResults.set(nodeId, result.value);
} else {
context.nodeResults.set(nodeId, {
nodeId,
status: "failed",
error: result.reason,
startTime: Date.now(),
endTime: Date.now(),
});
}
});
}
```
---
## 데이터 흐름
### 1. 소스 노드 실행
```typescript
async function executeSourceNode(node: TableSourceNode): Promise<any[]> {
const { tableName, schema, whereConditions } = node.data;
// 데이터베이스 쿼리 실행
const query = buildSelectQuery(tableName, schema, whereConditions);
const data = await executeQuery(query);
return data;
}
```
**결과 예시**:
```json
[
{ "id": 1, "name": "김철수", "age": 30 },
{ "id": 2, "name": "이영희", "age": 25 },
{ "id": 3, "name": "박민수", "age": 35 }
]
```
---
### 2. 액션 노드 실행
#### 데이터 전달 방식
```typescript
async function executeNode(
nodeId: string,
nodes: FlowNode[],
context: ExecutionContext
): Promise<NodeResult> {
const node = nodes.find((n) => n.id === nodeId);
const parents = getParentNodes(nodeId, edges);
// 1⃣ 부모 노드 상태 확인
const parentFailed = parents.some((p) => {
const parentResult = context.nodeResults.get(p.id);
return parentResult?.status === "failed";
});
if (parentFailed) {
return {
nodeId,
status: "skipped",
error: new Error("Parent node failed"),
startTime: Date.now(),
endTime: Date.now(),
};
}
// 2⃣ 입력 데이터 준비
const inputData = prepareInputData(node, parents, context);
// 3⃣ 액션 실행 (독립 트랜잭션)
return await executeActionWithTransaction(node, inputData);
}
```
#### 입력 데이터 준비
```typescript
function prepareInputData(
node: FlowNode,
parents: FlowNode[],
context: ExecutionContext
): any {
if (parents.length === 0) {
// 소스 노드
return null;
} else if (parents.length === 1) {
// 단일 부모: 부모의 결과 데이터 전달
const parentResult = context.nodeResults.get(parents[0].id);
return parentResult?.data || context.sourceData;
} else {
// 다중 부모: 모든 부모의 데이터 병합
return parents.map((p) => {
const result = context.nodeResults.get(p.id);
return result?.data || context.sourceData;
});
}
}
```
---
### 3. 병렬 실행 예시
```
TableSource
(100개 레코드)
┌──────┼──────┐
↓ ↓ ↓
INSERT UPDATE DELETE
(독립) (독립) (독립)
```
**실행 과정**:
```typescript
// 1. TableSource 실행
const sourceData = await executeTableSource();
// → [100개 레코드]
// 2. 병렬 실행 (Promise.allSettled)
const results = await Promise.allSettled([
executeInsertAction(insertNode, sourceData),
executeUpdateAction(updateNode, sourceData),
executeDeleteAction(deleteNode, sourceData),
]);
// 3. 각 액션은 독립 트랜잭션
// - INSERT 실패 → INSERT만 롤백
// - UPDATE 성공 → UPDATE 커밋
// - DELETE 성공 → DELETE 커밋
```
---
### 4. 연쇄 실행 예시
```
TableSource
INSERT
❌ (실패)
UPDATE-2
⏭️ (스킵)
```
**실행 과정**:
```typescript
// 1. TableSource 실행
const sourceData = await executeTableSource();
// → 성공 ✅
// 2. INSERT 실행
const insertResult = await executeInsertAction(insertNode, sourceData);
// → 실패 ❌ (롤백됨)
// 3. UPDATE-2 실행 시도
const parentFailed = insertResult.status === "failed";
if (parentFailed) {
return {
status: "skipped",
reason: "Parent INSERT failed",
};
// → 스킬 ⏭️
}
```
---
## 오류 처리
### 1. 독립 트랜잭션
각 액션 노드는 자체 트랜잭션을 가짐
```typescript
async function executeActionWithTransaction(
node: FlowNode,
inputData: any
): Promise<NodeResult> {
// 트랜잭션 시작
const transaction = await db.beginTransaction();
try {
const result = await performAction(node, inputData, transaction);
// 성공 시 커밋
await transaction.commit();
return {
nodeId: node.id,
status: "success",
data: result,
startTime: Date.now(),
endTime: Date.now(),
};
} catch (error) {
// 실패 시 롤백
await transaction.rollback();
return {
nodeId: node.id,
status: "failed",
error: error,
startTime: Date.now(),
endTime: Date.now(),
};
}
}
```
---
### 2. 부분 실패 허용
```typescript
// Promise.allSettled 사용
const results = await Promise.allSettled([action1(), action2(), action3()]);
// 결과 수집
const summary = {
total: results.length,
success: results.filter((r) => r.status === "fulfilled").length,
failed: results.filter((r) => r.status === "rejected").length,
details: results,
};
```
**예시 결과**:
```json
{
"total": 3,
"success": 2,
"failed": 1,
"details": [
{ "status": "rejected", "reason": "Duplicate key error" },
{ "status": "fulfilled", "value": { "updatedCount": 100 } },
{ "status": "fulfilled", "value": { "deletedCount": 50 } }
]
}
```
---
### 3. 연쇄 중단
부모 노드 실패 시 자식 노드 자동 스킵
```typescript
function shouldSkipNode(node: FlowNode, context: ExecutionContext): boolean {
const parents = getParentNodes(node.id);
return parents.some((parent) => {
const parentResult = context.nodeResults.get(parent.id);
return parentResult?.status === "failed";
});
}
```
---
### 4. 오류 메시지
```typescript
interface ExecutionError {
nodeId: string;
nodeName: string;
errorType: "validation" | "execution" | "connection" | "timeout";
message: string;
details?: any;
timestamp: number;
}
```
**오류 메시지 예시**:
```json
{
"nodeId": "insert-1",
"nodeName": "INSERT 액션",
"errorType": "execution",
"message": "Duplicate key error: 'email' already exists",
"details": {
"table": "users",
"constraint": "users_email_unique",
"value": "test@example.com"
},
"timestamp": 1704182400000
}
```
---
## 구현 계획
### Phase 1: 기본 실행 엔진 (우선순위: 높음)
**작업 항목**:
1. ✅ 위상 정렬 알고리즘 구현
2. ✅ 레벨별 실행 로직
3. ✅ Promise.allSettled 기반 병렬 실행
4. ✅ 독립 트랜잭션 처리
5. ✅ 연쇄 중단 로직
**예상 시간**: 1일
---
### Phase 2: 소스 노드 실행 (우선순위: 높음)
**작업 항목**:
1. ✅ TableSource 실행기
2. ✅ ExternalDBSource 실행기
3. ✅ RestAPISource 실행기
4. ✅ 데이터 캐싱
**예상 시간**: 1일
---
### Phase 3: 액션 노드 실행 (우선순위: 높음)
**작업 항목**:
1. ✅ INSERT 액션 실행기
2. ✅ UPDATE 액션 실행기
3. ✅ DELETE 액션 실행기
4. ✅ UPSERT 액션 실행기
5. ✅ 필드 매핑 적용
**예상 시간**: 2일
---
### Phase 4: 변환 노드 실행 (우선순위: 중간)
**작업 항목**:
1. ✅ FieldMapping 실행기
2. ✅ DataTransform 실행기
3. ✅ Condition 분기 처리
**예상 시간**: 1일
---
### Phase 5: 오류 처리 및 모니터링 (우선순위: 중간)
**작업 항목**:
1. ✅ 상세 오류 메시지
2. ✅ 실행 결과 리포트
3. ✅ 실행 로그 저장
4. ✅ 실시간 진행 상태 표시
**예상 시간**: 1일
---
### Phase 6: 최적화 (우선순위: 낮음)
**작업 항목**:
1. ⏳ 데이터 스트리밍 (대용량 데이터)
2. ⏳ 배치 처리 최적화
3. ⏳ 병렬 처리 튜닝
4. ⏳ 캐싱 전략
**예상 시간**: 2일
---
## 실행 결과 예시
### 성공 케이스
```json
{
"flowId": "flow-123",
"flowName": "사용자 데이터 동기화",
"status": "completed",
"startTime": "2025-01-02T10:00:00Z",
"endTime": "2025-01-02T10:00:05Z",
"duration": 5000,
"nodes": [
{
"nodeId": "source-1",
"nodeName": "TableSource",
"status": "success",
"recordCount": 100,
"duration": 500
},
{
"nodeId": "insert-1",
"nodeName": "INSERT",
"status": "success",
"insertedCount": 100,
"duration": 2000
},
{
"nodeId": "update-1",
"nodeName": "UPDATE",
"status": "success",
"updatedCount": 80,
"duration": 1500
}
],
"summary": {
"total": 3,
"success": 3,
"failed": 0,
"skipped": 0
}
}
```
---
### 부분 실패 케이스
```json
{
"flowId": "flow-124",
"flowName": "데이터 처리",
"status": "partial_success",
"startTime": "2025-01-02T11:00:00Z",
"endTime": "2025-01-02T11:00:08Z",
"duration": 8000,
"nodes": [
{
"nodeId": "source-1",
"nodeName": "TableSource",
"status": "success",
"recordCount": 100
},
{
"nodeId": "insert-1",
"nodeName": "INSERT",
"status": "failed",
"error": "Duplicate key error",
"details": "email 'test@example.com' already exists"
},
{
"nodeId": "update-2",
"nodeName": "UPDATE-2",
"status": "skipped",
"reason": "Parent INSERT failed"
},
{
"nodeId": "update-1",
"nodeName": "UPDATE",
"status": "success",
"updatedCount": 50
},
{
"nodeId": "delete-1",
"nodeName": "DELETE",
"status": "success",
"deletedCount": 20
}
],
"summary": {
"total": 5,
"success": 3,
"failed": 1,
"skipped": 1
}
}
```
---
## 다음 단계
1. ✅ 데이터 처리 방식 확정 (완료)
2. ⏳ 실행 엔진 구현 시작
3. ⏳ 테스트 케이스 작성
4. ⏳ UI에서 실행 결과 표시
---
**참고 문서**:
- [노드*기반*제어*시스템*개선\_계획.md](./노드_기반_제어_시스템_개선_계획.md)
- [노드*연결*규칙\_설계.md](./노드_연결_규칙_설계.md)
- [노드*구조*개선안.md](./노드_구조_개선안.md)