/** * 플로우 데이터 이동 서비스 (하이브리드 방식 지원) * - 상태 변경 방식: 같은 테이블 내에서 상태 컬럼 업데이트 * - 테이블 이동 방식: 다른 테이블로 데이터 복사 및 매핑 * - 하이브리드 방식: 두 가지 모두 수행 */ import db from "../database/db"; import { FlowAuditLog, FlowIntegrationContext } from "../types/flow"; import { FlowDefinitionService } from "./flowDefinitionService"; import { FlowStepService } from "./flowStepService"; import { FlowExternalDbIntegrationService } from "./flowExternalDbIntegrationService"; export class FlowDataMoveService { private flowDefinitionService: FlowDefinitionService; private flowStepService: FlowStepService; private externalDbIntegrationService: FlowExternalDbIntegrationService; constructor() { this.flowDefinitionService = new FlowDefinitionService(); this.flowStepService = new FlowStepService(); this.externalDbIntegrationService = new FlowExternalDbIntegrationService(); } /** * 데이터를 다음 플로우 단계로 이동 (하이브리드 지원) */ async moveDataToStep( flowId: number, fromStepId: number, toStepId: number, dataId: any, userId: string = "system", additionalData?: Record ): Promise<{ success: boolean; targetDataId?: any; message?: string }> { return await db.transaction(async (client) => { try { // 1. 단계 정보 조회 const fromStep = await this.flowStepService.findById(fromStepId); const toStep = await this.flowStepService.findById(toStepId); if (!fromStep || !toStep) { throw new Error("유효하지 않은 단계입니다"); } let targetDataId = dataId; let sourceTable = fromStep.tableName; let targetTable = toStep.tableName || fromStep.tableName; // 2. 이동 방식에 따라 처리 switch (toStep.moveType || "status") { case "status": // 상태 변경 방식 await this.moveByStatusChange( client, fromStep, toStep, dataId, additionalData ); break; case "table": // 테이블 이동 방식 targetDataId = await this.moveByTableTransfer( client, fromStep, toStep, dataId, additionalData ); targetTable = toStep.targetTable || toStep.tableName; break; case "both": // 하이브리드 방식: 둘 다 수행 await this.moveByStatusChange( client, fromStep, toStep, dataId, additionalData ); targetDataId = await this.moveByTableTransfer( client, fromStep, toStep, dataId, additionalData ); targetTable = toStep.targetTable || toStep.tableName; break; default: throw new Error(`지원하지 않는 이동 방식: ${toStep.moveType}`); } // 3. 매핑 테이블 업데이트 (테이블 이동 방식일 때) if (toStep.moveType === "table" || toStep.moveType === "both") { await this.updateDataMapping( client, flowId, toStepId, fromStepId, dataId, targetDataId ); } // 4. 외부 DB 연동 실행 (설정된 경우) if ( toStep.integrationType && toStep.integrationType !== "internal" && toStep.integrationConfig ) { await this.executeExternalIntegration( toStep, flowId, targetDataId, sourceTable, userId, additionalData ); } // 5. 감사 로그 기록 await this.logDataMove(client, { flowId, fromStepId, toStepId, moveType: toStep.moveType || "status", sourceTable, targetTable, sourceDataId: String(dataId), targetDataId: String(targetDataId), statusFrom: fromStep.statusValue, statusTo: toStep.statusValue, userId, }); return { success: true, targetDataId, message: "데이터가 성공적으로 이동되었습니다", }; } catch (error: any) { console.error("데이터 이동 실패:", error); throw error; } }); } /** * 상태 변경 방식으로 데이터 이동 */ private async moveByStatusChange( client: any, fromStep: any, toStep: any, dataId: any, additionalData?: Record ): Promise { const statusColumn = toStep.statusColumn || "flow_status"; const tableName = fromStep.tableName; // 추가 필드 업데이트 준비 const updates: string[] = [`${statusColumn} = $2`, `updated_at = NOW()`]; const values: any[] = [dataId, toStep.statusValue]; let paramIndex = 3; // 추가 데이터가 있으면 함께 업데이트 if (additionalData) { for (const [key, value] of Object.entries(additionalData)) { updates.push(`${key} = $${paramIndex}`); values.push(value); paramIndex++; } } const updateQuery = ` UPDATE ${tableName} SET ${updates.join(", ")} WHERE id = $1 `; const result = await client.query(updateQuery, values); if (result.rowCount === 0) { throw new Error(`데이터를 찾을 수 없습니다: ${dataId}`); } } /** * 테이블 이동 방식으로 데이터 이동 */ private async moveByTableTransfer( client: any, fromStep: any, toStep: any, dataId: any, additionalData?: Record ): Promise { const sourceTable = fromStep.tableName; const targetTable = toStep.targetTable || toStep.tableName; const fieldMappings = toStep.fieldMappings || {}; // 1. 소스 데이터 조회 const selectQuery = `SELECT * FROM ${sourceTable} WHERE id = $1`; const sourceResult = await client.query(selectQuery, [dataId]); if (sourceResult.length === 0) { throw new Error(`소스 데이터를 찾을 수 없습니다: ${dataId}`); } const sourceData = sourceResult[0]; // 2. 필드 매핑 적용 const mappedData: Record = {}; // 매핑 정의가 있으면 적용 for (const [sourceField, targetField] of Object.entries(fieldMappings)) { if (sourceData[sourceField] !== undefined) { mappedData[targetField as string] = sourceData[sourceField]; } } // 추가 데이터 병합 if (additionalData) { Object.assign(mappedData, additionalData); } // 3. 타겟 테이블에 데이터 삽입 if (Object.keys(mappedData).length === 0) { throw new Error("매핑할 데이터가 없습니다"); } const columns = Object.keys(mappedData); const values = Object.values(mappedData); const placeholders = columns.map((_, i) => `$${i + 1}`).join(", "); const insertQuery = ` INSERT INTO ${targetTable} (${columns.join(", ")}) VALUES (${placeholders}) RETURNING id `; const insertResult = await client.query(insertQuery, values); return insertResult[0].id; } /** * 데이터 매핑 테이블 업데이트 */ private async updateDataMapping( client: any, flowId: number, currentStepId: number, prevStepId: number, sourceDataId: any, targetDataId: any ): Promise { // 기존 매핑 조회 const selectQuery = ` SELECT id, step_data_map FROM flow_data_mapping WHERE flow_definition_id = $1 AND step_data_map->$2 = $3 `; const mappingResult = await client.query(selectQuery, [ flowId, String(prevStepId), JSON.stringify(String(sourceDataId)), ]); const stepDataMap: Record = mappingResult.length > 0 ? mappingResult[0].step_data_map : {}; // 새 단계 데이터 추가 stepDataMap[String(currentStepId)] = String(targetDataId); if (mappingResult.length > 0) { // 기존 매핑 업데이트 const updateQuery = ` UPDATE flow_data_mapping SET current_step_id = $1, step_data_map = $2, updated_at = NOW() WHERE id = $3 `; await client.query(updateQuery, [ currentStepId, JSON.stringify(stepDataMap), mappingResult[0].id, ]); } else { // 새 매핑 생성 const insertQuery = ` INSERT INTO flow_data_mapping (flow_definition_id, current_step_id, step_data_map) VALUES ($1, $2, $3) `; await client.query(insertQuery, [ flowId, currentStepId, JSON.stringify(stepDataMap), ]); } } /** * 감사 로그 기록 */ private async logDataMove(client: any, params: any): Promise { const query = ` INSERT INTO flow_audit_log ( flow_definition_id, from_step_id, to_step_id, move_type, source_table, target_table, source_data_id, target_data_id, status_from, status_to, changed_by, note ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) `; await client.query(query, [ params.flowId, params.fromStepId, params.toStepId, params.moveType, params.sourceTable, params.targetTable, params.sourceDataId, params.targetDataId, params.statusFrom, params.statusTo, params.userId, params.note || null, ]); } /** * 여러 데이터를 동시에 다음 단계로 이동 */ async moveBatchData( flowId: number, fromStepId: number, toStepId: number, dataIds: any[], userId: string = "system" ): Promise<{ success: boolean; results: any[] }> { const results = []; for (const dataId of dataIds) { try { const result = await this.moveDataToStep( flowId, fromStepId, toStepId, dataId, userId ); results.push({ dataId, ...result }); } catch (error: any) { results.push({ dataId, success: false, message: error.message }); } } return { success: results.every((r) => r.success), results, }; } /** * 데이터의 플로우 이력 조회 */ async getAuditLogs(flowId: number, dataId: string): Promise { const query = ` SELECT fal.*, fs_from.step_name as from_step_name, fs_to.step_name as to_step_name FROM flow_audit_log fal LEFT JOIN flow_step fs_from ON fal.from_step_id = fs_from.id LEFT JOIN flow_step fs_to ON fal.to_step_id = fs_to.id WHERE fal.flow_definition_id = $1 AND (fal.source_data_id = $2 OR fal.target_data_id = $2) ORDER BY fal.changed_at DESC `; const result = await db.query(query, [flowId, dataId]); return result.map((row) => ({ id: row.id, flowDefinitionId: row.flow_definition_id, tableName: row.table_name || row.source_table, recordId: row.record_id || row.source_data_id, fromStepId: row.from_step_id, toStepId: row.to_step_id, changedBy: row.changed_by, changedAt: row.changed_at, note: row.note, fromStepName: row.from_step_name, toStepName: row.to_step_name, moveType: row.move_type, sourceTable: row.source_table, targetTable: row.target_table, sourceDataId: row.source_data_id, targetDataId: row.target_data_id, statusFrom: row.status_from, statusTo: row.status_to, })); } /** * 특정 플로우의 모든 이력 조회 */ async getFlowAuditLogs( flowId: number, limit: number = 100 ): Promise { const query = ` SELECT fal.*, fs_from.step_name as from_step_name, fs_to.step_name as to_step_name FROM flow_audit_log fal LEFT JOIN flow_step fs_from ON fal.from_step_id = fs_from.id LEFT JOIN flow_step fs_to ON fal.to_step_id = fs_to.id WHERE fal.flow_definition_id = $1 ORDER BY fal.changed_at DESC LIMIT $2 `; const result = await db.query(query, [flowId, limit]); return result.map((row) => ({ id: row.id, flowDefinitionId: row.flow_definition_id, tableName: row.table_name || row.source_table, recordId: row.record_id || row.source_data_id, fromStepId: row.from_step_id, toStepId: row.to_step_id, changedBy: row.changed_by, changedAt: row.changed_at, note: row.note, fromStepName: row.from_step_name, toStepName: row.to_step_name, moveType: row.move_type, sourceTable: row.source_table, targetTable: row.target_table, sourceDataId: row.source_data_id, targetDataId: row.target_data_id, statusFrom: row.status_from, statusTo: row.status_to, })); } /** * 외부 DB 연동 실행 */ private async executeExternalIntegration( toStep: any, flowId: number, dataId: any, tableName: string | undefined, userId: string, additionalData?: Record ): Promise { const startTime = Date.now(); try { // 연동 컨텍스트 구성 const context: FlowIntegrationContext = { flowId, stepId: toStep.id, dataId, tableName, currentUser: userId, variables: { ...additionalData, stepName: toStep.stepName, stepId: toStep.id, }, }; // 연동 타입별 처리 switch (toStep.integrationType) { case "external_db": const result = await this.externalDbIntegrationService.execute( context, toStep.integrationConfig ); // 연동 로그 기록 await this.logIntegration( flowId, toStep.id, dataId, toStep.integrationType, toStep.integrationConfig.connectionId, toStep.integrationConfig, result.data, result.success ? "success" : "failed", result.error?.message, Date.now() - startTime, userId ); if (!result.success) { throw new Error( `외부 DB 연동 실패: ${result.error?.message || "알 수 없는 오류"}` ); } break; case "rest_api": // REST API 연동 (추후 구현) console.warn("REST API 연동은 아직 구현되지 않았습니다"); break; case "webhook": // Webhook 연동 (추후 구현) console.warn("Webhook 연동은 아직 구현되지 않았습니다"); break; case "hybrid": // 복합 연동 (추후 구현) console.warn("복합 연동은 아직 구현되지 않았습니다"); break; default: throw new Error(`지원하지 않는 연동 타입: ${toStep.integrationType}`); } } catch (error: any) { console.error("외부 연동 실행 실패:", error); // 연동 실패 로그 기록 await this.logIntegration( flowId, toStep.id, dataId, toStep.integrationType, toStep.integrationConfig?.connectionId, toStep.integrationConfig, null, "failed", error.message, Date.now() - startTime, userId ); throw error; } } /** * 외부 연동 로그 기록 */ private async logIntegration( flowId: number, stepId: number, dataId: any, integrationType: string, connectionId: number | undefined, requestPayload: any, responsePayload: any, status: "success" | "failed" | "timeout" | "rollback", errorMessage: string | undefined, executionTimeMs: number, userId: string ): Promise { const query = ` INSERT INTO flow_integration_log ( flow_definition_id, step_id, data_id, integration_type, connection_id, request_payload, response_payload, status, error_message, execution_time_ms, executed_by, executed_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) `; await db.query(query, [ flowId, stepId, String(dataId), integrationType, connectionId || null, requestPayload ? JSON.stringify(requestPayload) : null, responsePayload ? JSON.stringify(responsePayload) : null, status, errorMessage || null, executionTimeMs, userId, ]); } }