# 노드 실행 엔진 설계 **작성일**: 2025-01-02 **버전**: 1.0 **상태**: ✅ 확정 --- ## 📋 목차 1. [개요](#개요) 2. [실행 방식](#실행-방식) 3. [데이터 흐름](#데이터-흐름) 4. [오류 처리](#오류-처리) 5. [구현 계획](#구현-계획) --- ## 개요 ### 목적 노드 기반 데이터 플로우의 실행 엔진을 설계하여: - 효율적인 병렬 처리 - 안정적인 오류 처리 - 명확한 데이터 흐름 ### 핵심 원칙 1. **독립적 트랜잭션**: 각 액션 노드는 독립적인 트랜잭션 2. **부분 실패 허용**: 일부 실패해도 성공한 노드는 커밋 3. **연쇄 중단**: 부모 노드 실패 시 자식 노드 스킵 4. **병렬 실행**: 의존성 없는 노드는 병렬 실행 --- ## 실행 방식 ### 1. 기본 구조 ```typescript interface ExecutionContext { sourceData: any[]; // 원본 데이터 nodeResults: Map; // 각 노드 실행 결과 executionOrder: string[]; // 실행 순서 } interface NodeResult { nodeId: string; status: "pending" | "success" | "failed" | "skipped"; data?: any; error?: Error; startTime: number; endTime?: number; } ``` --- ### 2. 실행 단계 #### Step 1: 위상 정렬 (Topological Sort) 노드 간 의존성을 파악하여 실행 순서 결정 ```typescript function topologicalSort(nodes: FlowNode[], edges: FlowEdge[]): string[][] { // DAG(Directed Acyclic Graph) 순회 // 같은 레벨의 노드들은 배열로 그룹화 return [ ["tableSource-1"], // Level 0: 소스 ["insert-1", "update-1", "delete-1"], // Level 1: 병렬 실행 가능 ["update-2"], // Level 2: insert-1에 의존 ]; } ``` #### Step 2: 레벨별 실행 ```typescript async function executeFlow( nodes: FlowNode[], edges: FlowEdge[] ): Promise { const levels = topologicalSort(nodes, edges); const context: ExecutionContext = { sourceData: [], nodeResults: new Map(), executionOrder: [], }; for (const level of levels) { // 같은 레벨의 노드들은 병렬 실행 await executeLevel(level, nodes, context); } return generateExecutionReport(context); } ``` #### Step 3: 레벨 내 병렬 실행 ```typescript async function executeLevel( nodeIds: string[], nodes: FlowNode[], context: ExecutionContext ): Promise { // Promise.allSettled로 병렬 실행 const results = await Promise.allSettled( nodeIds.map((nodeId) => executeNode(nodeId, nodes, context)) ); // 결과 저장 results.forEach((result, index) => { const nodeId = nodeIds[index]; if (result.status === "fulfilled") { context.nodeResults.set(nodeId, result.value); } else { context.nodeResults.set(nodeId, { nodeId, status: "failed", error: result.reason, startTime: Date.now(), endTime: Date.now(), }); } }); } ``` --- ## 데이터 흐름 ### 1. 소스 노드 실행 ```typescript async function executeSourceNode(node: TableSourceNode): Promise { const { tableName, schema, whereConditions } = node.data; // 데이터베이스 쿼리 실행 const query = buildSelectQuery(tableName, schema, whereConditions); const data = await executeQuery(query); return data; } ``` **결과 예시**: ```json [ { "id": 1, "name": "김철수", "age": 30 }, { "id": 2, "name": "이영희", "age": 25 }, { "id": 3, "name": "박민수", "age": 35 } ] ``` --- ### 2. 액션 노드 실행 #### 데이터 전달 방식 ```typescript async function executeNode( nodeId: string, nodes: FlowNode[], context: ExecutionContext ): Promise { const node = nodes.find((n) => n.id === nodeId); const parents = getParentNodes(nodeId, edges); // 1️⃣ 부모 노드 상태 확인 const parentFailed = parents.some((p) => { const parentResult = context.nodeResults.get(p.id); return parentResult?.status === "failed"; }); if (parentFailed) { return { nodeId, status: "skipped", error: new Error("Parent node failed"), startTime: Date.now(), endTime: Date.now(), }; } // 2️⃣ 입력 데이터 준비 const inputData = prepareInputData(node, parents, context); // 3️⃣ 액션 실행 (독립 트랜잭션) return await executeActionWithTransaction(node, inputData); } ``` #### 입력 데이터 준비 ```typescript function prepareInputData( node: FlowNode, parents: FlowNode[], context: ExecutionContext ): any { if (parents.length === 0) { // 소스 노드 return null; } else if (parents.length === 1) { // 단일 부모: 부모의 결과 데이터 전달 const parentResult = context.nodeResults.get(parents[0].id); return parentResult?.data || context.sourceData; } else { // 다중 부모: 모든 부모의 데이터 병합 return parents.map((p) => { const result = context.nodeResults.get(p.id); return result?.data || context.sourceData; }); } } ``` --- ### 3. 병렬 실행 예시 ``` TableSource (100개 레코드) ↓ ┌──────┼──────┐ ↓ ↓ ↓ INSERT UPDATE DELETE (독립) (독립) (독립) ``` **실행 과정**: ```typescript // 1. TableSource 실행 const sourceData = await executeTableSource(); // → [100개 레코드] // 2. 병렬 실행 (Promise.allSettled) const results = await Promise.allSettled([ executeInsertAction(insertNode, sourceData), executeUpdateAction(updateNode, sourceData), executeDeleteAction(deleteNode, sourceData), ]); // 3. 각 액션은 독립 트랜잭션 // - INSERT 실패 → INSERT만 롤백 // - UPDATE 성공 → UPDATE 커밋 // - DELETE 성공 → DELETE 커밋 ``` --- ### 4. 연쇄 실행 예시 ``` TableSource ↓ INSERT ❌ (실패) ↓ UPDATE-2 ⏭️ (스킵) ``` **실행 과정**: ```typescript // 1. TableSource 실행 const sourceData = await executeTableSource(); // → 성공 ✅ // 2. INSERT 실행 const insertResult = await executeInsertAction(insertNode, sourceData); // → 실패 ❌ (롤백됨) // 3. UPDATE-2 실행 시도 const parentFailed = insertResult.status === "failed"; if (parentFailed) { return { status: "skipped", reason: "Parent INSERT failed", }; // → 스킬 ⏭️ } ``` --- ## 오류 처리 ### 1. 독립 트랜잭션 각 액션 노드는 자체 트랜잭션을 가짐 ```typescript async function executeActionWithTransaction( node: FlowNode, inputData: any ): Promise { // 트랜잭션 시작 const transaction = await db.beginTransaction(); try { const result = await performAction(node, inputData, transaction); // 성공 시 커밋 await transaction.commit(); return { nodeId: node.id, status: "success", data: result, startTime: Date.now(), endTime: Date.now(), }; } catch (error) { // 실패 시 롤백 await transaction.rollback(); return { nodeId: node.id, status: "failed", error: error, startTime: Date.now(), endTime: Date.now(), }; } } ``` --- ### 2. 부분 실패 허용 ```typescript // Promise.allSettled 사용 const results = await Promise.allSettled([action1(), action2(), action3()]); // 결과 수집 const summary = { total: results.length, success: results.filter((r) => r.status === "fulfilled").length, failed: results.filter((r) => r.status === "rejected").length, details: results, }; ``` **예시 결과**: ```json { "total": 3, "success": 2, "failed": 1, "details": [ { "status": "rejected", "reason": "Duplicate key error" }, { "status": "fulfilled", "value": { "updatedCount": 100 } }, { "status": "fulfilled", "value": { "deletedCount": 50 } } ] } ``` --- ### 3. 연쇄 중단 부모 노드 실패 시 자식 노드 자동 스킵 ```typescript function shouldSkipNode(node: FlowNode, context: ExecutionContext): boolean { const parents = getParentNodes(node.id); return parents.some((parent) => { const parentResult = context.nodeResults.get(parent.id); return parentResult?.status === "failed"; }); } ``` --- ### 4. 오류 메시지 ```typescript interface ExecutionError { nodeId: string; nodeName: string; errorType: "validation" | "execution" | "connection" | "timeout"; message: string; details?: any; timestamp: number; } ``` **오류 메시지 예시**: ```json { "nodeId": "insert-1", "nodeName": "INSERT 액션", "errorType": "execution", "message": "Duplicate key error: 'email' already exists", "details": { "table": "users", "constraint": "users_email_unique", "value": "test@example.com" }, "timestamp": 1704182400000 } ``` --- ## 구현 계획 ### Phase 1: 기본 실행 엔진 (우선순위: 높음) **작업 항목**: 1. ✅ 위상 정렬 알고리즘 구현 2. ✅ 레벨별 실행 로직 3. ✅ Promise.allSettled 기반 병렬 실행 4. ✅ 독립 트랜잭션 처리 5. ✅ 연쇄 중단 로직 **예상 시간**: 1일 --- ### Phase 2: 소스 노드 실행 (우선순위: 높음) **작업 항목**: 1. ✅ TableSource 실행기 2. ✅ ExternalDBSource 실행기 3. ✅ RestAPISource 실행기 4. ✅ 데이터 캐싱 **예상 시간**: 1일 --- ### Phase 3: 액션 노드 실행 (우선순위: 높음) **작업 항목**: 1. ✅ INSERT 액션 실행기 2. ✅ UPDATE 액션 실행기 3. ✅ DELETE 액션 실행기 4. ✅ UPSERT 액션 실행기 5. ✅ 필드 매핑 적용 **예상 시간**: 2일 --- ### Phase 4: 변환 노드 실행 (우선순위: 중간) **작업 항목**: 1. ✅ FieldMapping 실행기 2. ✅ DataTransform 실행기 3. ✅ Condition 분기 처리 **예상 시간**: 1일 --- ### Phase 5: 오류 처리 및 모니터링 (우선순위: 중간) **작업 항목**: 1. ✅ 상세 오류 메시지 2. ✅ 실행 결과 리포트 3. ✅ 실행 로그 저장 4. ✅ 실시간 진행 상태 표시 **예상 시간**: 1일 --- ### Phase 6: 최적화 (우선순위: 낮음) **작업 항목**: 1. ⏳ 데이터 스트리밍 (대용량 데이터) 2. ⏳ 배치 처리 최적화 3. ⏳ 병렬 처리 튜닝 4. ⏳ 캐싱 전략 **예상 시간**: 2일 --- ## 실행 결과 예시 ### 성공 케이스 ```json { "flowId": "flow-123", "flowName": "사용자 데이터 동기화", "status": "completed", "startTime": "2025-01-02T10:00:00Z", "endTime": "2025-01-02T10:00:05Z", "duration": 5000, "nodes": [ { "nodeId": "source-1", "nodeName": "TableSource", "status": "success", "recordCount": 100, "duration": 500 }, { "nodeId": "insert-1", "nodeName": "INSERT", "status": "success", "insertedCount": 100, "duration": 2000 }, { "nodeId": "update-1", "nodeName": "UPDATE", "status": "success", "updatedCount": 80, "duration": 1500 } ], "summary": { "total": 3, "success": 3, "failed": 0, "skipped": 0 } } ``` --- ### 부분 실패 케이스 ```json { "flowId": "flow-124", "flowName": "데이터 처리", "status": "partial_success", "startTime": "2025-01-02T11:00:00Z", "endTime": "2025-01-02T11:00:08Z", "duration": 8000, "nodes": [ { "nodeId": "source-1", "nodeName": "TableSource", "status": "success", "recordCount": 100 }, { "nodeId": "insert-1", "nodeName": "INSERT", "status": "failed", "error": "Duplicate key error", "details": "email 'test@example.com' already exists" }, { "nodeId": "update-2", "nodeName": "UPDATE-2", "status": "skipped", "reason": "Parent INSERT failed" }, { "nodeId": "update-1", "nodeName": "UPDATE", "status": "success", "updatedCount": 50 }, { "nodeId": "delete-1", "nodeName": "DELETE", "status": "success", "deletedCount": 20 } ], "summary": { "total": 5, "success": 3, "failed": 1, "skipped": 1 } } ``` --- ## 다음 단계 1. ✅ 데이터 처리 방식 확정 (완료) 2. ⏳ 실행 엔진 구현 시작 3. ⏳ 테스트 케이스 작성 4. ⏳ UI에서 실행 결과 표시 --- **참고 문서**: - [노드*기반*제어*시스템*개선\_계획.md](./노드_기반_제어_시스템_개선_계획.md) - [노드*연결*규칙\_설계.md](./노드_연결_규칙_설계.md) - [노드*구조*개선안.md](./노드_구조_개선안.md)