# 노드 플로우 기능 개선 사항 > 작성일: 2024-12-08 > 상태: 분석 완료, 개선 대기 ## 현재 구현 상태 ### 잘 구현된 기능 | 기능 | 상태 | 설명 | |------|------|------| | 위상 정렬 실행 | 완료 | DAG 기반 레벨별 실행 | | 트랜잭션 관리 | 완료 | 전체 플로우 단일 트랜잭션, 실패 시 자동 롤백 | | 병렬 실행 | 완료 | 같은 레벨 노드 `Promise.allSettled`로 병렬 처리 | | CRUD 액션 | 완료 | INSERT, UPDATE, DELETE, UPSERT 지원 | | 외부 DB 연동 | 완료 | PostgreSQL, MySQL, MSSQL, Oracle 지원 | | REST API 연동 | 완료 | GET, POST, PUT, DELETE 지원 | | 조건 분기 | 완료 | 다양한 연산자 지원 | | 데이터 변환 | 부분 완료 | UPPERCASE, TRIM, EXPLODE 등 기본 변환 | | 집계 함수 | 완료 | SUM, COUNT, AVG, MIN, MAX, FIRST, LAST | ### 관련 파일 - **백엔드 실행 엔진**: `backend-node/src/services/nodeFlowExecutionService.ts` - **백엔드 라우트**: `backend-node/src/routes/dataflow/node-flows.ts` - **프론트엔드 API**: `frontend/lib/api/nodeFlows.ts` - **프론트엔드 에디터**: `frontend/components/dataflow/node-editor/FlowEditor.tsx` - **타입 정의**: `backend-node/src/types/flow.ts` --- ## 개선 필요 사항 ### 1. [우선순위 높음] 실행 이력 로깅 **현재 상태**: 플로우 실행 이력이 저장되지 않음 **문제점**: - 언제, 누가, 어떤 플로우를 실행했는지 추적 불가 - 실패 원인 분석 어려움 - 감사(Audit) 요구사항 충족 불가 **개선 방안**: ```sql -- db/migrations/XXX_add_node_flow_execution_log.sql CREATE TABLE node_flow_execution_log ( id SERIAL PRIMARY KEY, flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE, execution_status VARCHAR(20) NOT NULL, -- 'success', 'failed', 'partial' execution_time_ms INTEGER, total_nodes INTEGER, success_nodes INTEGER, failed_nodes INTEGER, skipped_nodes INTEGER, executed_by VARCHAR(50), company_code VARCHAR(20), context_data JSONB, result_summary JSONB, error_message TEXT, created_at TIMESTAMP DEFAULT NOW() ); CREATE INDEX idx_flow_execution_log_flow_id ON node_flow_execution_log(flow_id); CREATE INDEX idx_flow_execution_log_created_at ON node_flow_execution_log(created_at DESC); CREATE INDEX idx_flow_execution_log_company_code ON node_flow_execution_log(company_code); ``` **필요 작업**: - [ ] 마이그레이션 파일 생성 - [ ] `nodeFlowExecutionService.ts`에 로그 저장 로직 추가 - [ ] 실행 이력 조회 API 추가 (`GET /api/dataflow/node-flows/:flowId/executions`) - [ ] 프론트엔드 실행 이력 UI 추가 --- ### 2. [우선순위 높음] 드라이런(Dry Run) 모드 **현재 상태**: 실제 데이터를 변경하지 않고 테스트할 방법 없음 **문제점**: - 프로덕션 데이터에 직접 영향 - 플로우 디버깅 어려움 - 신규 플로우 검증 불가 **개선 방안**: ```typescript // nodeFlowExecutionService.ts static async executeFlow( flowId: number, contextData: Record, options: { dryRun?: boolean } = {} ): Promise { if (options.dryRun) { // 트랜잭션 시작 후 항상 롤백 return transaction(async (client) => { const result = await this.executeFlowInternal(flowId, contextData, client); // 롤백을 위해 의도적으로 에러 발생 throw new DryRunComplete(result); }).catch((e) => { if (e instanceof DryRunComplete) { return { ...e.result, dryRun: true }; } throw e; }); } // 기존 로직... } ``` ```typescript // node-flows.ts 라우트 수정 router.post("/:flowId/execute", async (req, res) => { const dryRun = req.query.dryRun === 'true'; const result = await NodeFlowExecutionService.executeFlow( parseInt(flowId, 10), enrichedContextData, { dryRun } ); // ... }); ``` **필요 작업**: - [ ] `DryRunComplete` 예외 클래스 생성 - [ ] `executeFlow` 메서드에 `dryRun` 옵션 추가 - [ ] 라우트에 쿼리 파라미터 처리 추가 - [ ] 프론트엔드 "테스트 실행" 버튼 추가 --- ### 3. [우선순위 높음] 재시도 메커니즘 **현재 상태**: 외부 API/DB 호출 실패 시 재시도 없음 **문제점**: - 일시적 네트워크 오류로 전체 플로우 실패 - 외부 서비스 불안정 시 신뢰성 저하 **개선 방안**: ```typescript // utils/retry.ts export async function withRetry( fn: () => Promise, options: { maxRetries?: number; delay?: number; backoffMultiplier?: number; retryOn?: (error: any) => boolean; } = {} ): Promise { const { maxRetries = 3, delay = 1000, backoffMultiplier = 2, retryOn = () => true } = options; for (let attempt = 0; attempt < maxRetries; attempt++) { try { return await fn(); } catch (error) { if (attempt === maxRetries - 1 || !retryOn(error)) { throw error; } const waitTime = delay * Math.pow(backoffMultiplier, attempt); logger.warn(`재시도 ${attempt + 1}/${maxRetries}, ${waitTime}ms 후...`); await new Promise(r => setTimeout(r, waitTime)); } } throw new Error('재시도 횟수 초과'); } ``` ```typescript // nodeFlowExecutionService.ts에서 사용 const response = await withRetry( () => axios({ method, url, headers, data, timeout }), { maxRetries: 3, delay: 1000, retryOn: (err) => err.code === 'ECONNRESET' || err.response?.status >= 500 } ); ``` **필요 작업**: - [ ] `withRetry` 유틸리티 함수 생성 - [ ] REST API 호출 부분에 재시도 로직 적용 - [ ] 외부 DB 연결 부분에 재시도 로직 적용 - [ ] 노드별 재시도 설정 UI 추가 (선택사항) --- ### 4. [우선순위 높음] 미완성 데이터 변환 함수 **현재 상태**: FORMAT, CALCULATE, JSON_EXTRACT, CUSTOM 변환이 미구현 **문제점**: - 날짜/숫자 포맷팅 불가 - 계산식 처리 불가 - JSON 데이터 파싱 불가 **개선 방안**: ```typescript // nodeFlowExecutionService.ts - applyTransformation 메서드 수정 case "FORMAT": return rows.map((row) => { const value = row[sourceField]; let formatted = value; if (transform.formatType === 'date') { // dayjs 사용 formatted = dayjs(value).format(transform.formatPattern || 'YYYY-MM-DD'); } else if (transform.formatType === 'number') { // 숫자 포맷팅 const num = parseFloat(value); if (transform.formatPattern === 'currency') { formatted = num.toLocaleString('ko-KR', { style: 'currency', currency: 'KRW' }); } else if (transform.formatPattern === 'percent') { formatted = (num * 100).toFixed(transform.decimals || 0) + '%'; } else { formatted = num.toLocaleString('ko-KR', { maximumFractionDigits: transform.decimals || 2 }); } } return { ...row, [actualTargetField]: formatted }; }); case "CALCULATE": return rows.map((row) => { // 간단한 수식 평가 (보안 주의!) const expression = transform.expression; // 예: "price * quantity" const result = evaluateExpression(expression, row); return { ...row, [actualTargetField]: result }; }); case "JSON_EXTRACT": return rows.map((row) => { const jsonValue = typeof row[sourceField] === 'string' ? JSON.parse(row[sourceField]) : row[sourceField]; const extracted = jsonPath.query(jsonValue, transform.jsonPath); // JSONPath 라이브러리 사용 return { ...row, [actualTargetField]: extracted[0] || null }; }); ``` **필요 작업**: - [ ] `dayjs` 라이브러리 추가 (날짜 포맷팅) - [ ] `jsonpath` 라이브러리 추가 (JSON 추출) - [ ] 안전한 수식 평가 함수 구현 (eval 대신) - [ ] 각 변환 타입별 UI 설정 패널 추가 --- ### 5. [우선순위 중간] 플로우 버전 관리 **현재 상태**: 플로우 수정 시 이전 버전 덮어씀 **문제점**: - 실수로 수정한 플로우 복구 불가 - 변경 이력 추적 불가 **개선 방안**: ```sql -- db/migrations/XXX_add_node_flow_versions.sql CREATE TABLE node_flow_versions ( id SERIAL PRIMARY KEY, flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE, version INTEGER NOT NULL, flow_data JSONB NOT NULL, change_description TEXT, created_by VARCHAR(50), created_at TIMESTAMP DEFAULT NOW(), UNIQUE(flow_id, version) ); CREATE INDEX idx_flow_versions_flow_id ON node_flow_versions(flow_id); ``` ```typescript // 플로우 수정 시 버전 저장 async function updateNodeFlow(flowId, flowData, changeDescription, userId) { // 현재 버전 조회 const currentVersion = await queryOne( 'SELECT COALESCE(MAX(version), 0) as max_version FROM node_flow_versions WHERE flow_id = $1', [flowId] ); // 새 버전 저장 await query( 'INSERT INTO node_flow_versions (flow_id, version, flow_data, change_description, created_by) VALUES ($1, $2, $3, $4, $5)', [flowId, currentVersion.max_version + 1, flowData, changeDescription, userId] ); // 기존 업데이트 로직... } ``` **필요 작업**: - [ ] 버전 테이블 마이그레이션 생성 - [ ] 플로우 수정 시 버전 자동 저장 - [ ] 버전 목록 조회 API (`GET /api/dataflow/node-flows/:flowId/versions`) - [ ] 특정 버전으로 롤백 API (`POST /api/dataflow/node-flows/:flowId/rollback/:version`) - [ ] 프론트엔드 버전 히스토리 UI --- ### 6. [우선순위 중간] 복합 조건 지원 **현재 상태**: 조건 노드에서 단일 조건만 지원 **문제점**: - 복잡한 비즈니스 로직 표현 불가 - 여러 조건을 AND/OR로 조합 불가 **개선 방안**: ```typescript // 복합 조건 타입 정의 interface ConditionGroup { type: 'AND' | 'OR'; conditions: (Condition | ConditionGroup)[]; } interface Condition { field: string; operator: string; value: any; } // 조건 평가 함수 수정 function evaluateConditionGroup(group: ConditionGroup, data: any): boolean { const results = group.conditions.map(condition => { if ('type' in condition) { // 중첩된 그룹 return evaluateConditionGroup(condition, data); } else { // 단일 조건 return evaluateCondition(data[condition.field], condition.operator, condition.value); } }); return group.type === 'AND' ? results.every(r => r) : results.some(r => r); } ``` **필요 작업**: - [ ] 복합 조건 타입 정의 - [ ] `evaluateConditionGroup` 함수 구현 - [ ] 조건 노드 속성 패널 UI 수정 (AND/OR 그룹 빌더) --- ### 7. [우선순위 중간] 비동기 실행 **현재 상태**: 동기 실행만 가능 (HTTP 요청 타임아웃 제한) **문제점**: - 대용량 데이터 처리 시 타임아웃 - 장시간 실행 플로우 처리 불가 **개선 방안**: ```sql -- 실행 큐 테이블 CREATE TABLE node_flow_execution_queue ( id SERIAL PRIMARY KEY, flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id), execution_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(), status VARCHAR(20) NOT NULL DEFAULT 'queued', -- queued, running, completed, failed context_data JSONB, callback_url TEXT, result JSONB, error_message TEXT, queued_by VARCHAR(50), company_code VARCHAR(20), queued_at TIMESTAMP DEFAULT NOW(), started_at TIMESTAMP, completed_at TIMESTAMP ); ``` ```typescript // 비동기 실행 API router.post("/:flowId/execute-async", async (req, res) => { const { callbackUrl, contextData } = req.body; // 큐에 추가 const execution = await queryOne( `INSERT INTO node_flow_execution_queue (flow_id, context_data, callback_url, queued_by, company_code) VALUES ($1, $2, $3, $4, $5) RETURNING execution_id`, [flowId, contextData, callbackUrl, req.user?.userId, req.user?.companyCode] ); // 백그라운드 워커가 처리 return res.json({ success: true, executionId: execution.execution_id, status: 'queued' }); }); // 상태 조회 API router.get("/executions/:executionId", async (req, res) => { const execution = await queryOne( 'SELECT * FROM node_flow_execution_queue WHERE execution_id = $1', [req.params.executionId] ); return res.json({ success: true, data: execution }); }); ``` **필요 작업**: - [ ] 실행 큐 테이블 마이그레이션 - [ ] 비동기 실행 API 추가 - [ ] 백그라운드 워커 프로세스 구현 (별도 프로세스 또는 Bull 큐) - [ ] 웹훅 콜백 기능 구현 - [ ] 프론트엔드 비동기 실행 상태 폴링 UI --- ### 8. [우선순위 낮음] 플로우 스케줄링 **현재 상태**: 수동 실행만 가능 **문제점**: - 정기적인 배치 작업 자동화 불가 - 특정 시간 예약 실행 불가 **개선 방안**: ```sql -- 스케줄 테이블 CREATE TABLE node_flow_schedules ( id SERIAL PRIMARY KEY, flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE, schedule_name VARCHAR(100), cron_expression VARCHAR(50) NOT NULL, -- '0 9 * * 1-5' (평일 9시) context_data JSONB, is_active BOOLEAN DEFAULT true, last_run_at TIMESTAMP, next_run_at TIMESTAMP, created_by VARCHAR(50), company_code VARCHAR(20), created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); ``` **필요 작업**: - [ ] 스케줄 테이블 마이그레이션 - [ ] 스케줄 CRUD API - [ ] node-cron 또는 Bull 스케줄러 통합 - [ ] 스케줄 관리 UI --- ### 9. [우선순위 낮음] 플러그인 아키텍처 **현재 상태**: 새 노드 타입 추가 시 `nodeFlowExecutionService.ts` 직접 수정 필요 **문제점**: - 코드 복잡도 증가 - 확장성 제한 **개선 방안**: ```typescript // interfaces/NodeHandler.ts export interface NodeHandler { type: string; execute(node: FlowNode, inputData: any, context: ExecutionContext, client?: any): Promise; validate?(node: FlowNode): { valid: boolean; errors: string[] }; } // handlers/InsertActionHandler.ts export class InsertActionHandler implements NodeHandler { type = 'insertAction'; async execute(node, inputData, context, client) { // 기존 executeInsertAction 로직 } } // NodeHandlerRegistry.ts class NodeHandlerRegistry { private handlers = new Map(); register(handler: NodeHandler) { this.handlers.set(handler.type, handler); } get(type: string): NodeHandler | undefined { return this.handlers.get(type); } } // 사용 const registry = new NodeHandlerRegistry(); registry.register(new InsertActionHandler()); registry.register(new UpdateActionHandler()); // ... // executeNodeByType에서 const handler = registry.get(node.type); if (handler) { return handler.execute(node, inputData, context, client); } ``` **필요 작업**: - [ ] `NodeHandler` 인터페이스 정의 - [ ] 기존 노드 타입별 핸들러 클래스 분리 - [ ] `NodeHandlerRegistry` 구현 - [ ] 커스텀 노드 핸들러 등록 메커니즘 --- ### 10. [우선순위 낮음] 프론트엔드 연동 강화 **현재 상태**: 기본 에디터 구현됨 **개선 필요 항목**: - [ ] 실행 결과 시각화 (노드별 성공/실패 표시) - [ ] 실시간 실행 진행률 표시 - [ ] 드라이런 모드 UI - [ ] 실행 이력 조회 UI - [ ] 버전 히스토리 UI - [ ] 노드 검증 결과 표시 --- ## 프론트엔드 컴포넌트 CRUD 로직 이전 계획 현재 프론트엔드 컴포넌트에서 직접 CRUD를 수행하는 코드들을 노드 플로우로 이전해야 합니다. ### 이전 대상 컴포넌트 | 컴포넌트 | 파일 위치 | 현재 로직 | 이전 우선순위 | |----------|----------|----------|--------------| | SplitPanelLayoutComponent | `frontend/lib/registry/components/split-panel-layout/` | createRecord, updateRecord, deleteRecord | 높음 | | RepeatScreenModalComponent | `frontend/lib/registry/components/repeat-screen-modal/` | 다중 테이블 INSERT/UPDATE/DELETE | 높음 | | UniversalFormModalComponent | `frontend/lib/registry/components/universal-form-modal/` | 다중 행 저장 | 높음 | | SelectedItemsDetailInputComponent | `frontend/lib/registry/components/selected-items-detail-input/` | upsertGroupedRecords | 높음 | | ButtonPrimaryComponent | `frontend/lib/registry/components/button-primary/` | 상태 변경 POST | 중간 | | SimpleRepeaterTableComponent | `frontend/lib/registry/components/simple-repeater-table/` | 데이터 저장 POST | 중간 | ### 이전 방식 1. **플로우 생성**: 각 컴포넌트의 저장 로직을 노드 플로우로 구현 2. **프론트엔드 수정**: 직접 API 호출 대신 `executeNodeFlow(flowId, contextData)` 호출 3. **화면 설정에 플로우 연결**: 버튼 액션에 실행할 플로우 ID 설정 ```typescript // 현재 (프론트엔드에서 직접 호출) const result = await dataApi.createRecord(tableName, data); // 개선 후 (플로우 실행) const result = await executeNodeFlow(flowId, { formData: data, tableName: tableName, action: 'create' }); ``` --- ## 참고 자료 - 노드 플로우 실행 엔진: `backend-node/src/services/nodeFlowExecutionService.ts` - 플로우 타입 정의: `backend-node/src/types/flow.ts` - 프론트엔드 플로우 에디터: `frontend/components/dataflow/node-editor/FlowEditor.tsx` - 프론트엔드 플로우 API: `frontend/lib/api/nodeFlows.ts`