ERP-node/docs/FLOW_EXTERNAL_INTEGRATION_P...

20 KiB

플로우 관리 시스템 - 외부 연동 확장 계획

개요

현재 플로우 관리 시스템은 내부 데이터베이스의 상태 변경만 지원합니다. 실제 업무 환경에서는 다음과 같은 외부 연동이 필요합니다:

  1. 외부 데이터베이스: 다른 DB 서버의 데이터 상태 변경
  2. REST API 호출: 외부 시스템 API를 통한 상태 업데이트
  3. Webhook: 외부 시스템으로 이벤트 전송
  4. 복합 연동: 내부 DB + 외부 API 동시 처리

1. 데이터베이스 스키마 확장

1.1 플로우 단계 설정 확장

-- flow_step 테이블에 외부 연동 설정 추가
ALTER TABLE flow_step ADD COLUMN integration_type VARCHAR(50);
-- 값: 'internal' | 'external_db' | 'rest_api' | 'webhook' | 'hybrid'

ALTER TABLE flow_step ADD COLUMN integration_config JSONB;
-- 외부 연동 상세 설정 (JSON)

COMMENT ON COLUMN flow_step.integration_type IS '연동 타입: internal/external_db/rest_api/webhook/hybrid';
COMMENT ON COLUMN flow_step.integration_config IS '외부 연동 설정 (JSON 형식)';

1.2 외부 연결 정보 관리 테이블

-- 외부 데이터베이스 연결 정보
CREATE TABLE external_db_connection (
  id SERIAL PRIMARY KEY,
  name VARCHAR(100) NOT NULL UNIQUE,
  description TEXT,
  db_type VARCHAR(50) NOT NULL, -- 'postgresql' | 'mysql' | 'mssql' | 'oracle'
  host VARCHAR(255) NOT NULL,
  port INTEGER NOT NULL,
  database_name VARCHAR(100) NOT NULL,
  username VARCHAR(100) NOT NULL,
  password_encrypted TEXT NOT NULL, -- 암호화된 비밀번호
  ssl_enabled BOOLEAN DEFAULT false,
  connection_options JSONB, -- 추가 연결 옵션
  is_active BOOLEAN DEFAULT true,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

COMMENT ON TABLE external_db_connection IS '외부 데이터베이스 연결 정보';

-- 외부 API 연결 정보
CREATE TABLE external_api_connection (
  id SERIAL PRIMARY KEY,
  name VARCHAR(100) NOT NULL UNIQUE,
  description TEXT,
  base_url VARCHAR(500) NOT NULL,
  auth_type VARCHAR(50), -- 'none' | 'basic' | 'bearer' | 'api_key' | 'oauth2'
  auth_config JSONB, -- 인증 설정 (암호화된 토큰/키 포함)
  default_headers JSONB, -- 기본 헤더
  timeout_ms INTEGER DEFAULT 30000,
  retry_count INTEGER DEFAULT 3,
  is_active BOOLEAN DEFAULT true,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

COMMENT ON TABLE external_api_connection IS '외부 REST API 연결 정보';

2. integration_config JSON 스키마

2.1 External DB 설정

{
  "type": "external_db",
  "connectionId": 5, // external_db_connection.id
  "operation": "update", // 'update' | 'insert' | 'delete' | 'custom'
  "tableName": "external_orders",
  "updateFields": {
    "status": "approved",
    "approved_at": "NOW()",
    "approved_by": "{{currentUser}}"
  },
  "whereCondition": {
    "id": "{{dataId}}",
    "company_code": "{{companyCode}}"
  },
  "customQuery": null // operation이 'custom'인 경우 사용
}

2.2 REST API 설정

{
  "type": "rest_api",
  "connectionId": 3, // external_api_connection.id
  "method": "POST", // 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE'
  "endpoint": "/api/orders/{{dataId}}/approve",
  "headers": {
    "Content-Type": "application/json",
    "X-Request-ID": "{{generateUUID}}"
  },
  "body": {
    "status": "approved",
    "approvedBy": "{{currentUser}}",
    "approvedAt": "{{currentTimestamp}}",
    "notes": "{{notes}}"
  },
  "successCondition": {
    "statusCode": [200, 201],
    "responseField": "success",
    "expectedValue": true
  },
  "errorHandling": {
    "onFailure": "rollback", // 'rollback' | 'continue' | 'retry'
    "maxRetries": 3,
    "retryDelay": 1000
  }
}

2.3 Webhook 설정

{
  "type": "webhook",
  "url": "https://external-system.com/webhooks/flow-status-change",
  "method": "POST",
  "headers": {
    "Content-Type": "application/json",
    "Authorization": "Bearer {{webhookToken}}"
  },
  "payload": {
    "event": "flow.status.changed",
    "flowId": "{{flowId}}",
    "stepId": "{{stepId}}",
    "dataId": "{{dataId}}",
    "previousStatus": "{{previousStatus}}",
    "currentStatus": "{{currentStatus}}",
    "changedBy": "{{currentUser}}",
    "changedAt": "{{currentTimestamp}}"
  },
  "async": true, // 비동기 처리 여부
  "timeout": 5000
}

2.4 Hybrid (복합) 설정

{
  "type": "hybrid",
  "steps": [
    {
      "order": 1,
      "name": "internal_db_update",
      "type": "internal",
      "config": {
        "tableName": "orders",
        "statusColumn": "order_status",
        "statusValue": "approved"
      },
      "onError": "rollback"
    },
    {
      "order": 2,
      "name": "notify_external_system",
      "type": "rest_api",
      "config": {
        "connectionId": 3,
        "method": "POST",
        "endpoint": "/api/notifications/order-approved",
        "body": {
          "orderId": "{{dataId}}",
          "status": "approved"
        }
      },
      "onError": "log" // API 실패해도 계속 진행
    },
    {
      "order": 3,
      "name": "update_warehouse_system",
      "type": "external_db",
      "config": {
        "connectionId": 5,
        "operation": "update",
        "tableName": "warehouse_orders",
        "updateFields": {
          "status": "ready_to_ship"
        },
        "whereCondition": {
          "order_ref": "{{dataId}}"
        }
      },
      "onError": "rollback"
    }
  ],
  "transactionMode": "sequential", // 'sequential' | 'parallel'
  "rollbackStrategy": "all" // 'all' | 'completed_only' | 'none'
}

3. 백엔드 서비스 구조

3.1 서비스 계층 구조

flowDataMoveService (기존)
  └── FlowIntegrationService (신규)
      ├── InternalDbIntegration
      ├── ExternalDbIntegration
      ├── RestApiIntegration
      ├── WebhookIntegration
      └── HybridIntegration

3.2 주요 인터페이스

// 통합 인터페이스
interface FlowIntegration {
  execute(context: IntegrationContext): Promise<IntegrationResult>;
  validate(config: any): ValidationResult;
  rollback(context: IntegrationContext): Promise<void>;
}

// 실행 컨텍스트
interface IntegrationContext {
  flowId: number;
  stepId: number;
  dataId: string | number;
  tableName?: string;
  currentUser: string;
  variables: Record<string, any>; // 템플릿 변수
  transactionId?: string;
}

// 실행 결과
interface IntegrationResult {
  success: boolean;
  message?: string;
  data?: any;
  error?: {
    code: string;
    message: string;
    details?: any;
  };
  rollbackInfo?: any; // 롤백을 위한 정보
}

3.3 외부 DB 연동 서비스

export class ExternalDbIntegration implements FlowIntegration {
  private connectionPool: Map<number, any> = new Map();

  async execute(context: IntegrationContext): Promise<IntegrationResult> {
    const config = context.step.integrationConfig;

    // 1. 연결 정보 조회
    const connection = await this.getConnection(config.connectionId);

    // 2. 쿼리 생성 (템플릿 변수 치환)
    const query = this.buildQuery(config, context);

    // 3. 실행
    try {
      const result = await this.executeQuery(connection, query);

      return {
        success: true,
        data: result,
        rollbackInfo: {
          query: this.buildRollbackQuery(config, context),
          connection: config.connectionId,
        },
      };
    } catch (error) {
      return {
        success: false,
        error: {
          code: "EXTERNAL_DB_ERROR",
          message: error.message,
          details: error,
        },
      };
    }
  }

  async getConnection(connectionId: number) {
    // 연결 풀에서 가져오거나 새로 생성
    if (this.connectionPool.has(connectionId)) {
      return this.connectionPool.get(connectionId);
    }

    const connInfo = await this.loadConnectionInfo(connectionId);
    const connection = await this.createConnection(connInfo);
    this.connectionPool.set(connectionId, connection);

    return connection;
  }

  private buildQuery(config: any, context: IntegrationContext): string {
    // 템플릿 변수 치환
    const replacedConfig = this.replaceVariables(config, context);

    switch (config.operation) {
      case "update":
        return this.buildUpdateQuery(replacedConfig);
      case "insert":
        return this.buildInsertQuery(replacedConfig);
      case "delete":
        return this.buildDeleteQuery(replacedConfig);
      case "custom":
        return replacedConfig.customQuery;
      default:
        throw new Error(`Unsupported operation: ${config.operation}`);
    }
  }

  async rollback(context: IntegrationContext): Promise<void> {
    const rollbackInfo = context.rollbackInfo;
    const connection = await this.getConnection(rollbackInfo.connection);
    await this.executeQuery(connection, rollbackInfo.query);
  }
}

3.4 REST API 연동 서비스

export class RestApiIntegration implements FlowIntegration {
  private axiosInstances: Map<number, AxiosInstance> = new Map();

  async execute(context: IntegrationContext): Promise<IntegrationResult> {
    const config = context.step.integrationConfig;

    // 1. API 클라이언트 생성
    const client = await this.getApiClient(config.connectionId);

    // 2. 요청 구성 (템플릿 변수 치환)
    const request = this.buildRequest(config, context);

    // 3. API 호출
    try {
      const response = await this.executeRequest(client, request);

      // 4. 성공 조건 검증
      const isSuccess = this.validateSuccess(response, config.successCondition);

      if (isSuccess) {
        return {
          success: true,
          data: response.data,
          rollbackInfo: {
            compensatingRequest: this.buildCompensatingRequest(
              config,
              context,
              response
            ),
          },
        };
      } else {
        throw new Error("API call succeeded but validation failed");
      }
    } catch (error) {
      // 에러 처리 및 재시도
      return this.handleError(error, config, context);
    }
  }

  private async executeRequest(
    client: AxiosInstance,
    request: any
  ): Promise<AxiosResponse> {
    const { method, endpoint, headers, body, timeout } = request;

    return await client.request({
      method,
      url: endpoint,
      headers,
      data: body,
      timeout: timeout || 30000,
    });
  }

  private async handleError(
    error: any,
    config: any,
    context: IntegrationContext
  ): Promise<IntegrationResult> {
    const errorHandling = config.errorHandling;

    if (errorHandling.onFailure === "retry") {
      // 재시도 로직
      for (let i = 0; i < errorHandling.maxRetries; i++) {
        await this.delay(errorHandling.retryDelay);
        try {
          return await this.execute(context);
        } catch (retryError) {
          if (i === errorHandling.maxRetries - 1) {
            throw retryError;
          }
        }
      }
    }

    return {
      success: false,
      error: {
        code: "REST_API_ERROR",
        message: error.message,
        details: error.response?.data,
      },
    };
  }

  async rollback(context: IntegrationContext): Promise<void> {
    const rollbackInfo = context.rollbackInfo;
    if (rollbackInfo.compensatingRequest) {
      const client = await this.getApiClient(rollbackInfo.connectionId);
      await this.executeRequest(client, rollbackInfo.compensatingRequest);
    }
  }
}

4. 프론트엔드 UI 확장

4.1 플로우 단계 설정 패널 확장

// FlowStepPanel.tsx에 추가

// 연동 타입 선택
<Select value={integrationType} onValueChange={setIntegrationType}>
  <SelectItem value="internal">내부 DB</SelectItem>
  <SelectItem value="external_db">외부 DB</SelectItem>
  <SelectItem value="rest_api">REST API</SelectItem>
  <SelectItem value="webhook">Webhook</SelectItem>
  <SelectItem value="hybrid">복합 연동</SelectItem>
</Select>;

// 연동 타입별 설정 UI
{
  integrationType === "external_db" && (
    <ExternalDbConfigPanel
      config={integrationConfig}
      onChange={setIntegrationConfig}
    />
  );
}

{
  integrationType === "rest_api" && (
    <RestApiConfigPanel
      config={integrationConfig}
      onChange={setIntegrationConfig}
    />
  );
}

4.2 외부 DB 설정 패널

export function ExternalDbConfigPanel({ config, onChange }) {
  return (
    <div className="space-y-4">
      {/* 연결 선택 */}
      <Select value={config.connectionId}>
        <SelectLabel>외부 DB 연결</SelectLabel>
        {externalConnections.map((conn) => (
          <SelectItem key={conn.id} value={conn.id}>
            {conn.name} ({conn.dbType})
          </SelectItem>
        ))}
      </Select>

      {/* 작업 타입 */}
      <Select value={config.operation}>
        <SelectItem value="update">업데이트</SelectItem>
        <SelectItem value="insert">삽입</SelectItem>
        <SelectItem value="delete">삭제</SelectItem>
        <SelectItem value="custom">커스텀 쿼리</SelectItem>
      </Select>

      {/* 테이블명 */}
      <Input
        label="테이블명"
        value={config.tableName}
        onChange={(e) => onChange({ ...config, tableName: e.target.value })}
      />

      {/* 업데이트 필드 */}
      <FieldMapper
        fields={config.updateFields}
        onChange={(fields) => onChange({ ...config, updateFields: fields })}
      />

      {/* WHERE 조건 */}
      <ConditionBuilder
        conditions={config.whereCondition}
        onChange={(conditions) =>
          onChange({ ...config, whereCondition: conditions })
        }
      />
    </div>
  );
}

4.3 REST API 설정 패널

export function RestApiConfigPanel({ config, onChange }) {
  return (
    <div className="space-y-4">
      {/* API 연결 선택 */}
      <Select value={config.connectionId}>
        <SelectLabel>API 연결</SelectLabel>
        {apiConnections.map((conn) => (
          <SelectItem key={conn.id} value={conn.id}>
            {conn.name} ({conn.baseUrl})
          </SelectItem>
        ))}
      </Select>

      {/* HTTP 메서드 */}
      <Select value={config.method}>
        <SelectItem value="GET">GET</SelectItem>
        <SelectItem value="POST">POST</SelectItem>
        <SelectItem value="PUT">PUT</SelectItem>
        <SelectItem value="PATCH">PATCH</SelectItem>
        <SelectItem value="DELETE">DELETE</SelectItem>
      </Select>

      {/* 엔드포인트 */}
      <Input
        label="엔드포인트"
        placeholder="/api/orders/{{dataId}}/approve"
        value={config.endpoint}
        onChange={(e) => onChange({ ...config, endpoint: e.target.value })}
      />

      {/* 헤더 */}
      <KeyValueEditor
        label="헤더"
        data={config.headers}
        onChange={(headers) => onChange({ ...config, headers })}
      />

      {/* 요청 본문 */}
      <JsonEditor
        label="요청 본문"
        value={config.body}
        onChange={(body) => onChange({ ...config, body })}
      />

      {/* 성공 조건 */}
      <SuccessConditionEditor
        condition={config.successCondition}
        onChange={(condition) =>
          onChange({ ...config, successCondition: condition })
        }
      />
    </div>
  );
}

5. 보안 고려사항

5.1 자격 증명 암호화

// 비밀번호/토큰 암호화
import crypto from "crypto";

export class CredentialEncryption {
  private algorithm = "aes-256-gcm";
  private key: Buffer;

  constructor(secretKey: string) {
    this.key = crypto.scryptSync(secretKey, "salt", 32);
  }

  encrypt(text: string): string {
    const iv = crypto.randomBytes(16);
    const cipher = crypto.createCipheriv(this.algorithm, this.key, iv);

    let encrypted = cipher.update(text, "utf8", "hex");
    encrypted += cipher.final("hex");

    const authTag = cipher.getAuthTag();

    return `${iv.toString("hex")}:${authTag.toString("hex")}:${encrypted}`;
  }

  decrypt(encrypted: string): string {
    const [ivHex, authTagHex, encryptedText] = encrypted.split(":");

    const iv = Buffer.from(ivHex, "hex");
    const authTag = Buffer.from(authTagHex, "hex");
    const decipher = crypto.createDecipheriv(this.algorithm, this.key, iv);

    decipher.setAuthTag(authTag);

    let decrypted = decipher.update(encryptedText, "hex", "utf8");
    decrypted += decipher.final("utf8");

    return decrypted;
  }
}

5.2 권한 관리

-- 외부 연결 접근 권한
CREATE TABLE external_connection_permission (
  id SERIAL PRIMARY KEY,
  connection_type VARCHAR(50) NOT NULL, -- 'db' | 'api'
  connection_id INTEGER NOT NULL,
  user_id INTEGER,
  role_id INTEGER,
  can_view BOOLEAN DEFAULT false,
  can_use BOOLEAN DEFAULT false,
  can_edit BOOLEAN DEFAULT false,
  can_delete BOOLEAN DEFAULT false,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

6. 모니터링 및 로깅

6.1 외부 연동 로그

CREATE TABLE flow_integration_log (
  id SERIAL PRIMARY KEY,
  flow_definition_id INTEGER NOT NULL,
  step_id INTEGER NOT NULL,
  data_id VARCHAR(100),
  integration_type VARCHAR(50) NOT NULL,
  connection_id INTEGER,
  request_payload JSONB,
  response_payload JSONB,
  status VARCHAR(50) NOT NULL, -- 'success' | 'failed' | 'timeout' | 'rollback'
  error_message TEXT,
  execution_time_ms INTEGER,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_flow_integration_log_flow ON flow_integration_log(flow_definition_id);
CREATE INDEX idx_flow_integration_log_status ON flow_integration_log(status);
CREATE INDEX idx_flow_integration_log_created ON flow_integration_log(created_at);

7. 구현 우선순위

Phase 1: 외부 DB 연동 (2-3주)

  1. 외부 DB 연결 정보 관리 UI
  2. ExternalDbIntegration 서비스 구현
  3. 플로우 단계 설정에서 외부 DB 선택 기능
  4. 테스트 및 검증

Phase 2: REST API 연동 (2-3주)

  1. API 연결 정보 관리 UI
  2. RestApiIntegration 서비스 구현
  3. 템플릿 변수 시스템 구축
  4. 재시도 및 에러 처리

Phase 3: 복합 연동 (2주)

  1. HybridIntegration 서비스 구현
  2. 트랜잭션 관리 및 롤백
  3. UI에서 복합 시나리오 구성

Phase 4: 모니터링 및 최적화 (1-2주)

  1. 로깅 시스템 구축
  2. 성능 모니터링 대시보드
  3. 알림 시스템

8. 사용 예시

예시 1: 주문 승인 시 외부 ERP 시스템 업데이트

플로우: 주문 승인 프로세스
  ↓
검토중 단계
  ↓
승인됨 단계 (외부 연동)
  - 내부 DB: orders.status = 'approved'
  - 외부 ERP API: POST /api/orders/approve
    {
      "orderId": "{{dataId}}",
      "approvedBy": "{{currentUser}}",
      "approvedAt": "{{timestamp}}"
    }
  - Webhook: 회계 시스템에 승인 알림

예시 2: 재고 이동 시 창고 관리 DB 업데이트

플로우: 재고 이동 프로세스
  ↓
이동 요청 단계
  ↓
이동 완료 단계 (외부 DB 연동)
  - 내부 DB: inventory_transfer.status = 'completed'
  - 외부 창고 DB:
    UPDATE warehouse_stock
    SET quantity = quantity - {{transferQty}}
    WHERE product_id = {{productId}}
      AND warehouse_id = {{fromWarehouse}}

9. 기대 효과

  1. 시스템 통합: 여러 시스템 간 데이터 동기화 자동화
  2. 업무 효율: 수동 데이터 입력 감소
  3. 실시간 연동: 상태 변경 즉시 외부 시스템에 반영
  4. 확장성: 새로운 외부 시스템 쉽게 추가
  5. 트랜잭션 보장: 롤백 기능으로 데이터 일관성 유지

10. 참고사항

  • 외부 연동 설정은 관리자 권한 필요
  • 모든 외부 호출은 로그 기록
  • 타임아웃 및 재시도 정책 필수 설정
  • 정기적인 연결 상태 모니터링 필요
  • 보안을 위해 자격 증명은 반드시 암호화