ERP-node/docs/노드_실행_엔진_설계.md

12 KiB
Raw Permalink Blame History

노드 실행 엔진 설계

작성일: 2025-01-02
버전: 1.0
상태: 확정


📋 목차

  1. 개요
  2. 실행 방식
  3. 데이터 흐름
  4. 오류 처리
  5. 구현 계획

개요

목적

노드 기반 데이터 플로우의 실행 엔진을 설계하여:

  • 효율적인 병렬 처리
  • 안정적인 오류 처리
  • 명확한 데이터 흐름

핵심 원칙

  1. 독립적 트랜잭션: 각 액션 노드는 독립적인 트랜잭션
  2. 부분 실패 허용: 일부 실패해도 성공한 노드는 커밋
  3. 연쇄 중단: 부모 노드 실패 시 자식 노드 스킵
  4. 병렬 실행: 의존성 없는 노드는 병렬 실행

실행 방식

1. 기본 구조

interface ExecutionContext {
  sourceData: any[]; // 원본 데이터
  nodeResults: Map<string, NodeResult>; // 각 노드 실행 결과
  executionOrder: string[]; // 실행 순서
}

interface NodeResult {
  nodeId: string;
  status: "pending" | "success" | "failed" | "skipped";
  data?: any;
  error?: Error;
  startTime: number;
  endTime?: number;
}

2. 실행 단계

Step 1: 위상 정렬 (Topological Sort)

노드 간 의존성을 파악하여 실행 순서 결정

function topologicalSort(nodes: FlowNode[], edges: FlowEdge[]): string[][] {
  // DAG(Directed Acyclic Graph) 순회
  // 같은 레벨의 노드들은 배열로 그룹화

  return [
    ["tableSource-1"], // Level 0: 소스
    ["insert-1", "update-1", "delete-1"], // Level 1: 병렬 실행 가능
    ["update-2"], // Level 2: insert-1에 의존
  ];
}

Step 2: 레벨별 실행

async function executeFlow(
  nodes: FlowNode[],
  edges: FlowEdge[]
): Promise<ExecutionResult> {
  const levels = topologicalSort(nodes, edges);
  const context: ExecutionContext = {
    sourceData: [],
    nodeResults: new Map(),
    executionOrder: [],
  };

  for (const level of levels) {
    // 같은 레벨의 노드들은 병렬 실행
    await executeLevel(level, nodes, context);
  }

  return generateExecutionReport(context);
}

Step 3: 레벨 내 병렬 실행

async function executeLevel(
  nodeIds: string[],
  nodes: FlowNode[],
  context: ExecutionContext
): Promise<void> {
  // Promise.allSettled로 병렬 실행
  const results = await Promise.allSettled(
    nodeIds.map((nodeId) => executeNode(nodeId, nodes, context))
  );

  // 결과 저장
  results.forEach((result, index) => {
    const nodeId = nodeIds[index];
    if (result.status === "fulfilled") {
      context.nodeResults.set(nodeId, result.value);
    } else {
      context.nodeResults.set(nodeId, {
        nodeId,
        status: "failed",
        error: result.reason,
        startTime: Date.now(),
        endTime: Date.now(),
      });
    }
  });
}

데이터 흐름

1. 소스 노드 실행

async function executeSourceNode(node: TableSourceNode): Promise<any[]> {
  const { tableName, schema, whereConditions } = node.data;

  // 데이터베이스 쿼리 실행
  const query = buildSelectQuery(tableName, schema, whereConditions);
  const data = await executeQuery(query);

  return data;
}

결과 예시:

[
  { "id": 1, "name": "김철수", "age": 30 },
  { "id": 2, "name": "이영희", "age": 25 },
  { "id": 3, "name": "박민수", "age": 35 }
]

2. 액션 노드 실행

데이터 전달 방식

async function executeNode(
  nodeId: string,
  nodes: FlowNode[],
  context: ExecutionContext
): Promise<NodeResult> {
  const node = nodes.find((n) => n.id === nodeId);
  const parents = getParentNodes(nodeId, edges);

  // 1⃣ 부모 노드 상태 확인
  const parentFailed = parents.some((p) => {
    const parentResult = context.nodeResults.get(p.id);
    return parentResult?.status === "failed";
  });

  if (parentFailed) {
    return {
      nodeId,
      status: "skipped",
      error: new Error("Parent node failed"),
      startTime: Date.now(),
      endTime: Date.now(),
    };
  }

  // 2⃣ 입력 데이터 준비
  const inputData = prepareInputData(node, parents, context);

  // 3⃣ 액션 실행 (독립 트랜잭션)
  return await executeActionWithTransaction(node, inputData);
}

입력 데이터 준비

function prepareInputData(
  node: FlowNode,
  parents: FlowNode[],
  context: ExecutionContext
): any {
  if (parents.length === 0) {
    // 소스 노드
    return null;
  } else if (parents.length === 1) {
    // 단일 부모: 부모의 결과 데이터 전달
    const parentResult = context.nodeResults.get(parents[0].id);
    return parentResult?.data || context.sourceData;
  } else {
    // 다중 부모: 모든 부모의 데이터 병합
    return parents.map((p) => {
      const result = context.nodeResults.get(p.id);
      return result?.data || context.sourceData;
    });
  }
}

3. 병렬 실행 예시

        TableSource
        (100개 레코드)
             ↓
      ┌──────┼──────┐
      ↓      ↓      ↓
   INSERT UPDATE DELETE
   (독립)  (독립)  (독립)

실행 과정:

// 1. TableSource 실행
const sourceData = await executeTableSource();
// → [100개 레코드]

// 2. 병렬 실행 (Promise.allSettled)
const results = await Promise.allSettled([
  executeInsertAction(insertNode, sourceData),
  executeUpdateAction(updateNode, sourceData),
  executeDeleteAction(deleteNode, sourceData),
]);

// 3. 각 액션은 독립 트랜잭션
//    - INSERT 실패 → INSERT만 롤백
//    - UPDATE 성공 → UPDATE 커밋
//    - DELETE 성공 → DELETE 커밋

4. 연쇄 실행 예시

        TableSource
             ↓
          INSERT
             ❌ (실패)
             ↓
          UPDATE-2
             ⏭️ (스킵)

실행 과정:

// 1. TableSource 실행
const sourceData = await executeTableSource();
// → 성공 ✅

// 2. INSERT 실행
const insertResult = await executeInsertAction(insertNode, sourceData);
// → 실패 ❌ (롤백됨)

// 3. UPDATE-2 실행 시도
const parentFailed = insertResult.status === "failed";
if (parentFailed) {
  return {
    status: "skipped",
    reason: "Parent INSERT failed",
  };
  // → 스킬 ⏭️
}

오류 처리

1. 독립 트랜잭션

각 액션 노드는 자체 트랜잭션을 가짐

async function executeActionWithTransaction(
  node: FlowNode,
  inputData: any
): Promise<NodeResult> {
  // 트랜잭션 시작
  const transaction = await db.beginTransaction();

  try {
    const result = await performAction(node, inputData, transaction);

    // 성공 시 커밋
    await transaction.commit();

    return {
      nodeId: node.id,
      status: "success",
      data: result,
      startTime: Date.now(),
      endTime: Date.now(),
    };
  } catch (error) {
    // 실패 시 롤백
    await transaction.rollback();

    return {
      nodeId: node.id,
      status: "failed",
      error: error,
      startTime: Date.now(),
      endTime: Date.now(),
    };
  }
}

2. 부분 실패 허용

// Promise.allSettled 사용
const results = await Promise.allSettled([action1(), action2(), action3()]);

// 결과 수집
const summary = {
  total: results.length,
  success: results.filter((r) => r.status === "fulfilled").length,
  failed: results.filter((r) => r.status === "rejected").length,
  details: results,
};

예시 결과:

{
  "total": 3,
  "success": 2,
  "failed": 1,
  "details": [
    { "status": "rejected", "reason": "Duplicate key error" },
    { "status": "fulfilled", "value": { "updatedCount": 100 } },
    { "status": "fulfilled", "value": { "deletedCount": 50 } }
  ]
}

3. 연쇄 중단

부모 노드 실패 시 자식 노드 자동 스킵

function shouldSkipNode(node: FlowNode, context: ExecutionContext): boolean {
  const parents = getParentNodes(node.id);

  return parents.some((parent) => {
    const parentResult = context.nodeResults.get(parent.id);
    return parentResult?.status === "failed";
  });
}

4. 오류 메시지

interface ExecutionError {
  nodeId: string;
  nodeName: string;
  errorType: "validation" | "execution" | "connection" | "timeout";
  message: string;
  details?: any;
  timestamp: number;
}

오류 메시지 예시:

{
  "nodeId": "insert-1",
  "nodeName": "INSERT 액션",
  "errorType": "execution",
  "message": "Duplicate key error: 'email' already exists",
  "details": {
    "table": "users",
    "constraint": "users_email_unique",
    "value": "test@example.com"
  },
  "timestamp": 1704182400000
}

구현 계획

Phase 1: 기본 실행 엔진 (우선순위: 높음)

작업 항목:

  1. 위상 정렬 알고리즘 구현
  2. 레벨별 실행 로직
  3. Promise.allSettled 기반 병렬 실행
  4. 독립 트랜잭션 처리
  5. 연쇄 중단 로직

예상 시간: 1일


Phase 2: 소스 노드 실행 (우선순위: 높음)

작업 항목:

  1. TableSource 실행기
  2. ExternalDBSource 실행기
  3. RestAPISource 실행기
  4. 데이터 캐싱

예상 시간: 1일


Phase 3: 액션 노드 실행 (우선순위: 높음)

작업 항목:

  1. INSERT 액션 실행기
  2. UPDATE 액션 실행기
  3. DELETE 액션 실행기
  4. UPSERT 액션 실행기
  5. 필드 매핑 적용

예상 시간: 2일


Phase 4: 변환 노드 실행 (우선순위: 중간)

작업 항목:

  1. FieldMapping 실행기
  2. DataTransform 실행기
  3. Condition 분기 처리

예상 시간: 1일


Phase 5: 오류 처리 및 모니터링 (우선순위: 중간)

작업 항목:

  1. 상세 오류 메시지
  2. 실행 결과 리포트
  3. 실행 로그 저장
  4. 실시간 진행 상태 표시

예상 시간: 1일


Phase 6: 최적화 (우선순위: 낮음)

작업 항목:

  1. 데이터 스트리밍 (대용량 데이터)
  2. 배치 처리 최적화
  3. 병렬 처리 튜닝
  4. 캐싱 전략

예상 시간: 2일


실행 결과 예시

성공 케이스

{
  "flowId": "flow-123",
  "flowName": "사용자 데이터 동기화",
  "status": "completed",
  "startTime": "2025-01-02T10:00:00Z",
  "endTime": "2025-01-02T10:00:05Z",
  "duration": 5000,
  "nodes": [
    {
      "nodeId": "source-1",
      "nodeName": "TableSource",
      "status": "success",
      "recordCount": 100,
      "duration": 500
    },
    {
      "nodeId": "insert-1",
      "nodeName": "INSERT",
      "status": "success",
      "insertedCount": 100,
      "duration": 2000
    },
    {
      "nodeId": "update-1",
      "nodeName": "UPDATE",
      "status": "success",
      "updatedCount": 80,
      "duration": 1500
    }
  ],
  "summary": {
    "total": 3,
    "success": 3,
    "failed": 0,
    "skipped": 0
  }
}

부분 실패 케이스

{
  "flowId": "flow-124",
  "flowName": "데이터 처리",
  "status": "partial_success",
  "startTime": "2025-01-02T11:00:00Z",
  "endTime": "2025-01-02T11:00:08Z",
  "duration": 8000,
  "nodes": [
    {
      "nodeId": "source-1",
      "nodeName": "TableSource",
      "status": "success",
      "recordCount": 100
    },
    {
      "nodeId": "insert-1",
      "nodeName": "INSERT",
      "status": "failed",
      "error": "Duplicate key error",
      "details": "email 'test@example.com' already exists"
    },
    {
      "nodeId": "update-2",
      "nodeName": "UPDATE-2",
      "status": "skipped",
      "reason": "Parent INSERT failed"
    },
    {
      "nodeId": "update-1",
      "nodeName": "UPDATE",
      "status": "success",
      "updatedCount": 50
    },
    {
      "nodeId": "delete-1",
      "nodeName": "DELETE",
      "status": "success",
      "deletedCount": 20
    }
  ],
  "summary": {
    "total": 5,
    "success": 3,
    "failed": 1,
    "skipped": 1
  }
}

다음 단계

  1. 데이터 처리 방식 확정 (완료)
  2. 실행 엔진 구현 시작
  3. 테스트 케이스 작성
  4. UI에서 실행 결과 표시

참고 문서: