20 KiB
20 KiB
플로우 관리 시스템 - 외부 연동 확장 계획
개요
현재 플로우 관리 시스템은 내부 데이터베이스의 상태 변경만 지원합니다. 실제 업무 환경에서는 다음과 같은 외부 연동이 필요합니다:
- 외부 데이터베이스: 다른 DB 서버의 데이터 상태 변경
- REST API 호출: 외부 시스템 API를 통한 상태 업데이트
- Webhook: 외부 시스템으로 이벤트 전송
- 복합 연동: 내부 DB + 외부 API 동시 처리
1. 데이터베이스 스키마 확장
1.1 플로우 단계 설정 확장
-- flow_step 테이블에 외부 연동 설정 추가
ALTER TABLE flow_step ADD COLUMN integration_type VARCHAR(50);
-- 값: 'internal' | 'external_db' | 'rest_api' | 'webhook' | 'hybrid'
ALTER TABLE flow_step ADD COLUMN integration_config JSONB;
-- 외부 연동 상세 설정 (JSON)
COMMENT ON COLUMN flow_step.integration_type IS '연동 타입: internal/external_db/rest_api/webhook/hybrid';
COMMENT ON COLUMN flow_step.integration_config IS '외부 연동 설정 (JSON 형식)';
1.2 외부 연결 정보 관리 테이블
-- 외부 데이터베이스 연결 정보
CREATE TABLE external_db_connection (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
description TEXT,
db_type VARCHAR(50) NOT NULL, -- 'postgresql' | 'mysql' | 'mssql' | 'oracle'
host VARCHAR(255) NOT NULL,
port INTEGER NOT NULL,
database_name VARCHAR(100) NOT NULL,
username VARCHAR(100) NOT NULL,
password_encrypted TEXT NOT NULL, -- 암호화된 비밀번호
ssl_enabled BOOLEAN DEFAULT false,
connection_options JSONB, -- 추가 연결 옵션
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
COMMENT ON TABLE external_db_connection IS '외부 데이터베이스 연결 정보';
-- 외부 API 연결 정보
CREATE TABLE external_api_connection (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
description TEXT,
base_url VARCHAR(500) NOT NULL,
auth_type VARCHAR(50), -- 'none' | 'basic' | 'bearer' | 'api_key' | 'oauth2'
auth_config JSONB, -- 인증 설정 (암호화된 토큰/키 포함)
default_headers JSONB, -- 기본 헤더
timeout_ms INTEGER DEFAULT 30000,
retry_count INTEGER DEFAULT 3,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
COMMENT ON TABLE external_api_connection IS '외부 REST API 연결 정보';
2. integration_config JSON 스키마
2.1 External DB 설정
{
"type": "external_db",
"connectionId": 5, // external_db_connection.id
"operation": "update", // 'update' | 'insert' | 'delete' | 'custom'
"tableName": "external_orders",
"updateFields": {
"status": "approved",
"approved_at": "NOW()",
"approved_by": "{{currentUser}}"
},
"whereCondition": {
"id": "{{dataId}}",
"company_code": "{{companyCode}}"
},
"customQuery": null // operation이 'custom'인 경우 사용
}
2.2 REST API 설정
{
"type": "rest_api",
"connectionId": 3, // external_api_connection.id
"method": "POST", // 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE'
"endpoint": "/api/orders/{{dataId}}/approve",
"headers": {
"Content-Type": "application/json",
"X-Request-ID": "{{generateUUID}}"
},
"body": {
"status": "approved",
"approvedBy": "{{currentUser}}",
"approvedAt": "{{currentTimestamp}}",
"notes": "{{notes}}"
},
"successCondition": {
"statusCode": [200, 201],
"responseField": "success",
"expectedValue": true
},
"errorHandling": {
"onFailure": "rollback", // 'rollback' | 'continue' | 'retry'
"maxRetries": 3,
"retryDelay": 1000
}
}
2.3 Webhook 설정
{
"type": "webhook",
"url": "https://external-system.com/webhooks/flow-status-change",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer {{webhookToken}}"
},
"payload": {
"event": "flow.status.changed",
"flowId": "{{flowId}}",
"stepId": "{{stepId}}",
"dataId": "{{dataId}}",
"previousStatus": "{{previousStatus}}",
"currentStatus": "{{currentStatus}}",
"changedBy": "{{currentUser}}",
"changedAt": "{{currentTimestamp}}"
},
"async": true, // 비동기 처리 여부
"timeout": 5000
}
2.4 Hybrid (복합) 설정
{
"type": "hybrid",
"steps": [
{
"order": 1,
"name": "internal_db_update",
"type": "internal",
"config": {
"tableName": "orders",
"statusColumn": "order_status",
"statusValue": "approved"
},
"onError": "rollback"
},
{
"order": 2,
"name": "notify_external_system",
"type": "rest_api",
"config": {
"connectionId": 3,
"method": "POST",
"endpoint": "/api/notifications/order-approved",
"body": {
"orderId": "{{dataId}}",
"status": "approved"
}
},
"onError": "log" // API 실패해도 계속 진행
},
{
"order": 3,
"name": "update_warehouse_system",
"type": "external_db",
"config": {
"connectionId": 5,
"operation": "update",
"tableName": "warehouse_orders",
"updateFields": {
"status": "ready_to_ship"
},
"whereCondition": {
"order_ref": "{{dataId}}"
}
},
"onError": "rollback"
}
],
"transactionMode": "sequential", // 'sequential' | 'parallel'
"rollbackStrategy": "all" // 'all' | 'completed_only' | 'none'
}
3. 백엔드 서비스 구조
3.1 서비스 계층 구조
flowDataMoveService (기존)
└── FlowIntegrationService (신규)
├── InternalDbIntegration
├── ExternalDbIntegration
├── RestApiIntegration
├── WebhookIntegration
└── HybridIntegration
3.2 주요 인터페이스
// 통합 인터페이스
interface FlowIntegration {
execute(context: IntegrationContext): Promise<IntegrationResult>;
validate(config: any): ValidationResult;
rollback(context: IntegrationContext): Promise<void>;
}
// 실행 컨텍스트
interface IntegrationContext {
flowId: number;
stepId: number;
dataId: string | number;
tableName?: string;
currentUser: string;
variables: Record<string, any>; // 템플릿 변수
transactionId?: string;
}
// 실행 결과
interface IntegrationResult {
success: boolean;
message?: string;
data?: any;
error?: {
code: string;
message: string;
details?: any;
};
rollbackInfo?: any; // 롤백을 위한 정보
}
3.3 외부 DB 연동 서비스
export class ExternalDbIntegration implements FlowIntegration {
private connectionPool: Map<number, any> = new Map();
async execute(context: IntegrationContext): Promise<IntegrationResult> {
const config = context.step.integrationConfig;
// 1. 연결 정보 조회
const connection = await this.getConnection(config.connectionId);
// 2. 쿼리 생성 (템플릿 변수 치환)
const query = this.buildQuery(config, context);
// 3. 실행
try {
const result = await this.executeQuery(connection, query);
return {
success: true,
data: result,
rollbackInfo: {
query: this.buildRollbackQuery(config, context),
connection: config.connectionId,
},
};
} catch (error) {
return {
success: false,
error: {
code: "EXTERNAL_DB_ERROR",
message: error.message,
details: error,
},
};
}
}
async getConnection(connectionId: number) {
// 연결 풀에서 가져오거나 새로 생성
if (this.connectionPool.has(connectionId)) {
return this.connectionPool.get(connectionId);
}
const connInfo = await this.loadConnectionInfo(connectionId);
const connection = await this.createConnection(connInfo);
this.connectionPool.set(connectionId, connection);
return connection;
}
private buildQuery(config: any, context: IntegrationContext): string {
// 템플릿 변수 치환
const replacedConfig = this.replaceVariables(config, context);
switch (config.operation) {
case "update":
return this.buildUpdateQuery(replacedConfig);
case "insert":
return this.buildInsertQuery(replacedConfig);
case "delete":
return this.buildDeleteQuery(replacedConfig);
case "custom":
return replacedConfig.customQuery;
default:
throw new Error(`Unsupported operation: ${config.operation}`);
}
}
async rollback(context: IntegrationContext): Promise<void> {
const rollbackInfo = context.rollbackInfo;
const connection = await this.getConnection(rollbackInfo.connection);
await this.executeQuery(connection, rollbackInfo.query);
}
}
3.4 REST API 연동 서비스
export class RestApiIntegration implements FlowIntegration {
private axiosInstances: Map<number, AxiosInstance> = new Map();
async execute(context: IntegrationContext): Promise<IntegrationResult> {
const config = context.step.integrationConfig;
// 1. API 클라이언트 생성
const client = await this.getApiClient(config.connectionId);
// 2. 요청 구성 (템플릿 변수 치환)
const request = this.buildRequest(config, context);
// 3. API 호출
try {
const response = await this.executeRequest(client, request);
// 4. 성공 조건 검증
const isSuccess = this.validateSuccess(response, config.successCondition);
if (isSuccess) {
return {
success: true,
data: response.data,
rollbackInfo: {
compensatingRequest: this.buildCompensatingRequest(
config,
context,
response
),
},
};
} else {
throw new Error("API call succeeded but validation failed");
}
} catch (error) {
// 에러 처리 및 재시도
return this.handleError(error, config, context);
}
}
private async executeRequest(
client: AxiosInstance,
request: any
): Promise<AxiosResponse> {
const { method, endpoint, headers, body, timeout } = request;
return await client.request({
method,
url: endpoint,
headers,
data: body,
timeout: timeout || 30000,
});
}
private async handleError(
error: any,
config: any,
context: IntegrationContext
): Promise<IntegrationResult> {
const errorHandling = config.errorHandling;
if (errorHandling.onFailure === "retry") {
// 재시도 로직
for (let i = 0; i < errorHandling.maxRetries; i++) {
await this.delay(errorHandling.retryDelay);
try {
return await this.execute(context);
} catch (retryError) {
if (i === errorHandling.maxRetries - 1) {
throw retryError;
}
}
}
}
return {
success: false,
error: {
code: "REST_API_ERROR",
message: error.message,
details: error.response?.data,
},
};
}
async rollback(context: IntegrationContext): Promise<void> {
const rollbackInfo = context.rollbackInfo;
if (rollbackInfo.compensatingRequest) {
const client = await this.getApiClient(rollbackInfo.connectionId);
await this.executeRequest(client, rollbackInfo.compensatingRequest);
}
}
}
4. 프론트엔드 UI 확장
4.1 플로우 단계 설정 패널 확장
// FlowStepPanel.tsx에 추가
// 연동 타입 선택
<Select value={integrationType} onValueChange={setIntegrationType}>
<SelectItem value="internal">내부 DB</SelectItem>
<SelectItem value="external_db">외부 DB</SelectItem>
<SelectItem value="rest_api">REST API</SelectItem>
<SelectItem value="webhook">Webhook</SelectItem>
<SelectItem value="hybrid">복합 연동</SelectItem>
</Select>;
// 연동 타입별 설정 UI
{
integrationType === "external_db" && (
<ExternalDbConfigPanel
config={integrationConfig}
onChange={setIntegrationConfig}
/>
);
}
{
integrationType === "rest_api" && (
<RestApiConfigPanel
config={integrationConfig}
onChange={setIntegrationConfig}
/>
);
}
4.2 외부 DB 설정 패널
export function ExternalDbConfigPanel({ config, onChange }) {
return (
<div className="space-y-4">
{/* 연결 선택 */}
<Select value={config.connectionId}>
<SelectLabel>외부 DB 연결</SelectLabel>
{externalConnections.map((conn) => (
<SelectItem key={conn.id} value={conn.id}>
{conn.name} ({conn.dbType})
</SelectItem>
))}
</Select>
{/* 작업 타입 */}
<Select value={config.operation}>
<SelectItem value="update">업데이트</SelectItem>
<SelectItem value="insert">삽입</SelectItem>
<SelectItem value="delete">삭제</SelectItem>
<SelectItem value="custom">커스텀 쿼리</SelectItem>
</Select>
{/* 테이블명 */}
<Input
label="테이블명"
value={config.tableName}
onChange={(e) => onChange({ ...config, tableName: e.target.value })}
/>
{/* 업데이트 필드 */}
<FieldMapper
fields={config.updateFields}
onChange={(fields) => onChange({ ...config, updateFields: fields })}
/>
{/* WHERE 조건 */}
<ConditionBuilder
conditions={config.whereCondition}
onChange={(conditions) =>
onChange({ ...config, whereCondition: conditions })
}
/>
</div>
);
}
4.3 REST API 설정 패널
export function RestApiConfigPanel({ config, onChange }) {
return (
<div className="space-y-4">
{/* API 연결 선택 */}
<Select value={config.connectionId}>
<SelectLabel>API 연결</SelectLabel>
{apiConnections.map((conn) => (
<SelectItem key={conn.id} value={conn.id}>
{conn.name} ({conn.baseUrl})
</SelectItem>
))}
</Select>
{/* HTTP 메서드 */}
<Select value={config.method}>
<SelectItem value="GET">GET</SelectItem>
<SelectItem value="POST">POST</SelectItem>
<SelectItem value="PUT">PUT</SelectItem>
<SelectItem value="PATCH">PATCH</SelectItem>
<SelectItem value="DELETE">DELETE</SelectItem>
</Select>
{/* 엔드포인트 */}
<Input
label="엔드포인트"
placeholder="/api/orders/{{dataId}}/approve"
value={config.endpoint}
onChange={(e) => onChange({ ...config, endpoint: e.target.value })}
/>
{/* 헤더 */}
<KeyValueEditor
label="헤더"
data={config.headers}
onChange={(headers) => onChange({ ...config, headers })}
/>
{/* 요청 본문 */}
<JsonEditor
label="요청 본문"
value={config.body}
onChange={(body) => onChange({ ...config, body })}
/>
{/* 성공 조건 */}
<SuccessConditionEditor
condition={config.successCondition}
onChange={(condition) =>
onChange({ ...config, successCondition: condition })
}
/>
</div>
);
}
5. 보안 고려사항
5.1 자격 증명 암호화
// 비밀번호/토큰 암호화
import crypto from "crypto";
export class CredentialEncryption {
private algorithm = "aes-256-gcm";
private key: Buffer;
constructor(secretKey: string) {
this.key = crypto.scryptSync(secretKey, "salt", 32);
}
encrypt(text: string): string {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(this.algorithm, this.key, iv);
let encrypted = cipher.update(text, "utf8", "hex");
encrypted += cipher.final("hex");
const authTag = cipher.getAuthTag();
return `${iv.toString("hex")}:${authTag.toString("hex")}:${encrypted}`;
}
decrypt(encrypted: string): string {
const [ivHex, authTagHex, encryptedText] = encrypted.split(":");
const iv = Buffer.from(ivHex, "hex");
const authTag = Buffer.from(authTagHex, "hex");
const decipher = crypto.createDecipheriv(this.algorithm, this.key, iv);
decipher.setAuthTag(authTag);
let decrypted = decipher.update(encryptedText, "hex", "utf8");
decrypted += decipher.final("utf8");
return decrypted;
}
}
5.2 권한 관리
-- 외부 연결 접근 권한
CREATE TABLE external_connection_permission (
id SERIAL PRIMARY KEY,
connection_type VARCHAR(50) NOT NULL, -- 'db' | 'api'
connection_id INTEGER NOT NULL,
user_id INTEGER,
role_id INTEGER,
can_view BOOLEAN DEFAULT false,
can_use BOOLEAN DEFAULT false,
can_edit BOOLEAN DEFAULT false,
can_delete BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
6. 모니터링 및 로깅
6.1 외부 연동 로그
CREATE TABLE flow_integration_log (
id SERIAL PRIMARY KEY,
flow_definition_id INTEGER NOT NULL,
step_id INTEGER NOT NULL,
data_id VARCHAR(100),
integration_type VARCHAR(50) NOT NULL,
connection_id INTEGER,
request_payload JSONB,
response_payload JSONB,
status VARCHAR(50) NOT NULL, -- 'success' | 'failed' | 'timeout' | 'rollback'
error_message TEXT,
execution_time_ms INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_flow_integration_log_flow ON flow_integration_log(flow_definition_id);
CREATE INDEX idx_flow_integration_log_status ON flow_integration_log(status);
CREATE INDEX idx_flow_integration_log_created ON flow_integration_log(created_at);
7. 구현 우선순위
Phase 1: 외부 DB 연동 (2-3주)
- 외부 DB 연결 정보 관리 UI
- ExternalDbIntegration 서비스 구현
- 플로우 단계 설정에서 외부 DB 선택 기능
- 테스트 및 검증
Phase 2: REST API 연동 (2-3주)
- API 연결 정보 관리 UI
- RestApiIntegration 서비스 구현
- 템플릿 변수 시스템 구축
- 재시도 및 에러 처리
Phase 3: 복합 연동 (2주)
- HybridIntegration 서비스 구현
- 트랜잭션 관리 및 롤백
- UI에서 복합 시나리오 구성
Phase 4: 모니터링 및 최적화 (1-2주)
- 로깅 시스템 구축
- 성능 모니터링 대시보드
- 알림 시스템
8. 사용 예시
예시 1: 주문 승인 시 외부 ERP 시스템 업데이트
플로우: 주문 승인 프로세스
↓
검토중 단계
↓
승인됨 단계 (외부 연동)
- 내부 DB: orders.status = 'approved'
- 외부 ERP API: POST /api/orders/approve
{
"orderId": "{{dataId}}",
"approvedBy": "{{currentUser}}",
"approvedAt": "{{timestamp}}"
}
- Webhook: 회계 시스템에 승인 알림
예시 2: 재고 이동 시 창고 관리 DB 업데이트
플로우: 재고 이동 프로세스
↓
이동 요청 단계
↓
이동 완료 단계 (외부 DB 연동)
- 내부 DB: inventory_transfer.status = 'completed'
- 외부 창고 DB:
UPDATE warehouse_stock
SET quantity = quantity - {{transferQty}}
WHERE product_id = {{productId}}
AND warehouse_id = {{fromWarehouse}}
9. 기대 효과
- 시스템 통합: 여러 시스템 간 데이터 동기화 자동화
- 업무 효율: 수동 데이터 입력 감소
- 실시간 연동: 상태 변경 즉시 외부 시스템에 반영
- 확장성: 새로운 외부 시스템 쉽게 추가
- 트랜잭션 보장: 롤백 기능으로 데이터 일관성 유지
10. 참고사항
- 외부 연동 설정은 관리자 권한 필요
- 모든 외부 호출은 로그 기록
- 타임아웃 및 재시도 정책 필수 설정
- 정기적인 연결 상태 모니터링 필요
- 보안을 위해 자격 증명은 반드시 암호화