# 플로우 관리 시스템 - 외부 연동 확장 계획 ## 개요 현재 플로우 관리 시스템은 내부 데이터베이스의 상태 변경만 지원합니다. 실제 업무 환경에서는 다음과 같은 외부 연동이 필요합니다: 1. **외부 데이터베이스**: 다른 DB 서버의 데이터 상태 변경 2. **REST API 호출**: 외부 시스템 API를 통한 상태 업데이트 3. **Webhook**: 외부 시스템으로 이벤트 전송 4. **복합 연동**: 내부 DB + 외부 API 동시 처리 --- ## 1. 데이터베이스 스키마 확장 ### 1.1 플로우 단계 설정 확장 ```sql -- flow_step 테이블에 외부 연동 설정 추가 ALTER TABLE flow_step ADD COLUMN integration_type VARCHAR(50); -- 값: 'internal' | 'external_db' | 'rest_api' | 'webhook' | 'hybrid' ALTER TABLE flow_step ADD COLUMN integration_config JSONB; -- 외부 연동 상세 설정 (JSON) COMMENT ON COLUMN flow_step.integration_type IS '연동 타입: internal/external_db/rest_api/webhook/hybrid'; COMMENT ON COLUMN flow_step.integration_config IS '외부 연동 설정 (JSON 형식)'; ``` ### 1.2 외부 연결 정보 관리 테이블 ```sql -- 외부 데이터베이스 연결 정보 CREATE TABLE external_db_connection ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL UNIQUE, description TEXT, db_type VARCHAR(50) NOT NULL, -- 'postgresql' | 'mysql' | 'mssql' | 'oracle' host VARCHAR(255) NOT NULL, port INTEGER NOT NULL, database_name VARCHAR(100) NOT NULL, username VARCHAR(100) NOT NULL, password_encrypted TEXT NOT NULL, -- 암호화된 비밀번호 ssl_enabled BOOLEAN DEFAULT false, connection_options JSONB, -- 추가 연결 옵션 is_active BOOLEAN DEFAULT true, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); COMMENT ON TABLE external_db_connection IS '외부 데이터베이스 연결 정보'; -- 외부 API 연결 정보 CREATE TABLE external_api_connection ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL UNIQUE, description TEXT, base_url VARCHAR(500) NOT NULL, auth_type VARCHAR(50), -- 'none' | 'basic' | 'bearer' | 'api_key' | 'oauth2' auth_config JSONB, -- 인증 설정 (암호화된 토큰/키 포함) default_headers JSONB, -- 기본 헤더 timeout_ms INTEGER DEFAULT 30000, retry_count INTEGER DEFAULT 3, is_active BOOLEAN DEFAULT true, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); COMMENT ON TABLE external_api_connection IS '외부 REST API 연결 정보'; ``` --- ## 2. integration_config JSON 스키마 ### 2.1 External DB 설정 ```json { "type": "external_db", "connectionId": 5, // external_db_connection.id "operation": "update", // 'update' | 'insert' | 'delete' | 'custom' "tableName": "external_orders", "updateFields": { "status": "approved", "approved_at": "NOW()", "approved_by": "{{currentUser}}" }, "whereCondition": { "id": "{{dataId}}", "company_code": "{{companyCode}}" }, "customQuery": null // operation이 'custom'인 경우 사용 } ``` ### 2.2 REST API 설정 ```json { "type": "rest_api", "connectionId": 3, // external_api_connection.id "method": "POST", // 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE' "endpoint": "/api/orders/{{dataId}}/approve", "headers": { "Content-Type": "application/json", "X-Request-ID": "{{generateUUID}}" }, "body": { "status": "approved", "approvedBy": "{{currentUser}}", "approvedAt": "{{currentTimestamp}}", "notes": "{{notes}}" }, "successCondition": { "statusCode": [200, 201], "responseField": "success", "expectedValue": true }, "errorHandling": { "onFailure": "rollback", // 'rollback' | 'continue' | 'retry' "maxRetries": 3, "retryDelay": 1000 } } ``` ### 2.3 Webhook 설정 ```json { "type": "webhook", "url": "https://external-system.com/webhooks/flow-status-change", "method": "POST", "headers": { "Content-Type": "application/json", "Authorization": "Bearer {{webhookToken}}" }, "payload": { "event": "flow.status.changed", "flowId": "{{flowId}}", "stepId": "{{stepId}}", "dataId": "{{dataId}}", "previousStatus": "{{previousStatus}}", "currentStatus": "{{currentStatus}}", "changedBy": "{{currentUser}}", "changedAt": "{{currentTimestamp}}" }, "async": true, // 비동기 처리 여부 "timeout": 5000 } ``` ### 2.4 Hybrid (복합) 설정 ```json { "type": "hybrid", "steps": [ { "order": 1, "name": "internal_db_update", "type": "internal", "config": { "tableName": "orders", "statusColumn": "order_status", "statusValue": "approved" }, "onError": "rollback" }, { "order": 2, "name": "notify_external_system", "type": "rest_api", "config": { "connectionId": 3, "method": "POST", "endpoint": "/api/notifications/order-approved", "body": { "orderId": "{{dataId}}", "status": "approved" } }, "onError": "log" // API 실패해도 계속 진행 }, { "order": 3, "name": "update_warehouse_system", "type": "external_db", "config": { "connectionId": 5, "operation": "update", "tableName": "warehouse_orders", "updateFields": { "status": "ready_to_ship" }, "whereCondition": { "order_ref": "{{dataId}}" } }, "onError": "rollback" } ], "transactionMode": "sequential", // 'sequential' | 'parallel' "rollbackStrategy": "all" // 'all' | 'completed_only' | 'none' } ``` --- ## 3. 백엔드 서비스 구조 ### 3.1 서비스 계층 구조 ``` flowDataMoveService (기존) └── FlowIntegrationService (신규) ├── InternalDbIntegration ├── ExternalDbIntegration ├── RestApiIntegration ├── WebhookIntegration └── HybridIntegration ``` ### 3.2 주요 인터페이스 ```typescript // 통합 인터페이스 interface FlowIntegration { execute(context: IntegrationContext): Promise; validate(config: any): ValidationResult; rollback(context: IntegrationContext): Promise; } // 실행 컨텍스트 interface IntegrationContext { flowId: number; stepId: number; dataId: string | number; tableName?: string; currentUser: string; variables: Record; // 템플릿 변수 transactionId?: string; } // 실행 결과 interface IntegrationResult { success: boolean; message?: string; data?: any; error?: { code: string; message: string; details?: any; }; rollbackInfo?: any; // 롤백을 위한 정보 } ``` ### 3.3 외부 DB 연동 서비스 ```typescript export class ExternalDbIntegration implements FlowIntegration { private connectionPool: Map = new Map(); async execute(context: IntegrationContext): Promise { const config = context.step.integrationConfig; // 1. 연결 정보 조회 const connection = await this.getConnection(config.connectionId); // 2. 쿼리 생성 (템플릿 변수 치환) const query = this.buildQuery(config, context); // 3. 실행 try { const result = await this.executeQuery(connection, query); return { success: true, data: result, rollbackInfo: { query: this.buildRollbackQuery(config, context), connection: config.connectionId, }, }; } catch (error) { return { success: false, error: { code: "EXTERNAL_DB_ERROR", message: error.message, details: error, }, }; } } async getConnection(connectionId: number) { // 연결 풀에서 가져오거나 새로 생성 if (this.connectionPool.has(connectionId)) { return this.connectionPool.get(connectionId); } const connInfo = await this.loadConnectionInfo(connectionId); const connection = await this.createConnection(connInfo); this.connectionPool.set(connectionId, connection); return connection; } private buildQuery(config: any, context: IntegrationContext): string { // 템플릿 변수 치환 const replacedConfig = this.replaceVariables(config, context); switch (config.operation) { case "update": return this.buildUpdateQuery(replacedConfig); case "insert": return this.buildInsertQuery(replacedConfig); case "delete": return this.buildDeleteQuery(replacedConfig); case "custom": return replacedConfig.customQuery; default: throw new Error(`Unsupported operation: ${config.operation}`); } } async rollback(context: IntegrationContext): Promise { const rollbackInfo = context.rollbackInfo; const connection = await this.getConnection(rollbackInfo.connection); await this.executeQuery(connection, rollbackInfo.query); } } ``` ### 3.4 REST API 연동 서비스 ```typescript export class RestApiIntegration implements FlowIntegration { private axiosInstances: Map = new Map(); async execute(context: IntegrationContext): Promise { const config = context.step.integrationConfig; // 1. API 클라이언트 생성 const client = await this.getApiClient(config.connectionId); // 2. 요청 구성 (템플릿 변수 치환) const request = this.buildRequest(config, context); // 3. API 호출 try { const response = await this.executeRequest(client, request); // 4. 성공 조건 검증 const isSuccess = this.validateSuccess(response, config.successCondition); if (isSuccess) { return { success: true, data: response.data, rollbackInfo: { compensatingRequest: this.buildCompensatingRequest( config, context, response ), }, }; } else { throw new Error("API call succeeded but validation failed"); } } catch (error) { // 에러 처리 및 재시도 return this.handleError(error, config, context); } } private async executeRequest( client: AxiosInstance, request: any ): Promise { const { method, endpoint, headers, body, timeout } = request; return await client.request({ method, url: endpoint, headers, data: body, timeout: timeout || 30000, }); } private async handleError( error: any, config: any, context: IntegrationContext ): Promise { const errorHandling = config.errorHandling; if (errorHandling.onFailure === "retry") { // 재시도 로직 for (let i = 0; i < errorHandling.maxRetries; i++) { await this.delay(errorHandling.retryDelay); try { return await this.execute(context); } catch (retryError) { if (i === errorHandling.maxRetries - 1) { throw retryError; } } } } return { success: false, error: { code: "REST_API_ERROR", message: error.message, details: error.response?.data, }, }; } async rollback(context: IntegrationContext): Promise { const rollbackInfo = context.rollbackInfo; if (rollbackInfo.compensatingRequest) { const client = await this.getApiClient(rollbackInfo.connectionId); await this.executeRequest(client, rollbackInfo.compensatingRequest); } } } ``` --- ## 4. 프론트엔드 UI 확장 ### 4.1 플로우 단계 설정 패널 확장 ```typescript // FlowStepPanel.tsx에 추가 // 연동 타입 선택 ; // 연동 타입별 설정 UI { integrationType === "external_db" && ( ); } { integrationType === "rest_api" && ( ); } ``` ### 4.2 외부 DB 설정 패널 ```typescript export function ExternalDbConfigPanel({ config, onChange }) { return (
{/* 연결 선택 */} {/* 작업 타입 */} {/* 테이블명 */} onChange({ ...config, tableName: e.target.value })} /> {/* 업데이트 필드 */} onChange({ ...config, updateFields: fields })} /> {/* WHERE 조건 */} onChange({ ...config, whereCondition: conditions }) } />
); } ``` ### 4.3 REST API 설정 패널 ```typescript export function RestApiConfigPanel({ config, onChange }) { return (
{/* API 연결 선택 */} {/* HTTP 메서드 */} {/* 엔드포인트 */} onChange({ ...config, endpoint: e.target.value })} /> {/* 헤더 */} onChange({ ...config, headers })} /> {/* 요청 본문 */} onChange({ ...config, body })} /> {/* 성공 조건 */} onChange({ ...config, successCondition: condition }) } />
); } ``` --- ## 5. 보안 고려사항 ### 5.1 자격 증명 암호화 ```typescript // 비밀번호/토큰 암호화 import crypto from "crypto"; export class CredentialEncryption { private algorithm = "aes-256-gcm"; private key: Buffer; constructor(secretKey: string) { this.key = crypto.scryptSync(secretKey, "salt", 32); } encrypt(text: string): string { const iv = crypto.randomBytes(16); const cipher = crypto.createCipheriv(this.algorithm, this.key, iv); let encrypted = cipher.update(text, "utf8", "hex"); encrypted += cipher.final("hex"); const authTag = cipher.getAuthTag(); return `${iv.toString("hex")}:${authTag.toString("hex")}:${encrypted}`; } decrypt(encrypted: string): string { const [ivHex, authTagHex, encryptedText] = encrypted.split(":"); const iv = Buffer.from(ivHex, "hex"); const authTag = Buffer.from(authTagHex, "hex"); const decipher = crypto.createDecipheriv(this.algorithm, this.key, iv); decipher.setAuthTag(authTag); let decrypted = decipher.update(encryptedText, "hex", "utf8"); decrypted += decipher.final("utf8"); return decrypted; } } ``` ### 5.2 권한 관리 ```sql -- 외부 연결 접근 권한 CREATE TABLE external_connection_permission ( id SERIAL PRIMARY KEY, connection_type VARCHAR(50) NOT NULL, -- 'db' | 'api' connection_id INTEGER NOT NULL, user_id INTEGER, role_id INTEGER, can_view BOOLEAN DEFAULT false, can_use BOOLEAN DEFAULT false, can_edit BOOLEAN DEFAULT false, can_delete BOOLEAN DEFAULT false, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); ``` --- ## 6. 모니터링 및 로깅 ### 6.1 외부 연동 로그 ```sql CREATE TABLE flow_integration_log ( id SERIAL PRIMARY KEY, flow_definition_id INTEGER NOT NULL, step_id INTEGER NOT NULL, data_id VARCHAR(100), integration_type VARCHAR(50) NOT NULL, connection_id INTEGER, request_payload JSONB, response_payload JSONB, status VARCHAR(50) NOT NULL, -- 'success' | 'failed' | 'timeout' | 'rollback' error_message TEXT, execution_time_ms INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_flow_integration_log_flow ON flow_integration_log(flow_definition_id); CREATE INDEX idx_flow_integration_log_status ON flow_integration_log(status); CREATE INDEX idx_flow_integration_log_created ON flow_integration_log(created_at); ``` --- ## 7. 구현 우선순위 ### Phase 1: 외부 DB 연동 (2-3주) 1. 외부 DB 연결 정보 관리 UI 2. ExternalDbIntegration 서비스 구현 3. 플로우 단계 설정에서 외부 DB 선택 기능 4. 테스트 및 검증 ### Phase 2: REST API 연동 (2-3주) 1. API 연결 정보 관리 UI 2. RestApiIntegration 서비스 구현 3. 템플릿 변수 시스템 구축 4. 재시도 및 에러 처리 ### Phase 3: 복합 연동 (2주) 1. HybridIntegration 서비스 구현 2. 트랜잭션 관리 및 롤백 3. UI에서 복합 시나리오 구성 ### Phase 4: 모니터링 및 최적화 (1-2주) 1. 로깅 시스템 구축 2. 성능 모니터링 대시보드 3. 알림 시스템 --- ## 8. 사용 예시 ### 예시 1: 주문 승인 시 외부 ERP 시스템 업데이트 ``` 플로우: 주문 승인 프로세스 ↓ 검토중 단계 ↓ 승인됨 단계 (외부 연동) - 내부 DB: orders.status = 'approved' - 외부 ERP API: POST /api/orders/approve { "orderId": "{{dataId}}", "approvedBy": "{{currentUser}}", "approvedAt": "{{timestamp}}" } - Webhook: 회계 시스템에 승인 알림 ``` ### 예시 2: 재고 이동 시 창고 관리 DB 업데이트 ``` 플로우: 재고 이동 프로세스 ↓ 이동 요청 단계 ↓ 이동 완료 단계 (외부 DB 연동) - 내부 DB: inventory_transfer.status = 'completed' - 외부 창고 DB: UPDATE warehouse_stock SET quantity = quantity - {{transferQty}} WHERE product_id = {{productId}} AND warehouse_id = {{fromWarehouse}} ``` --- ## 9. 기대 효과 1. **시스템 통합**: 여러 시스템 간 데이터 동기화 자동화 2. **업무 효율**: 수동 데이터 입력 감소 3. **실시간 연동**: 상태 변경 즉시 외부 시스템에 반영 4. **확장성**: 새로운 외부 시스템 쉽게 추가 5. **트랜잭션 보장**: 롤백 기능으로 데이터 일관성 유지 --- ## 10. 참고사항 - 외부 연동 설정은 관리자 권한 필요 - 모든 외부 호출은 로그 기록 - 타임아웃 및 재시도 정책 필수 설정 - 정기적인 연결 상태 모니터링 필요 - 보안을 위해 자격 증명은 반드시 암호화