/** * 플로우 데이터 이동 서비스 (하이브리드 방식 지원) * - 상태 변경 방식: 같은 테이블 내에서 상태 컬럼 업데이트 * - 테이블 이동 방식: 다른 테이블로 데이터 복사 및 매핑 * - 하이브리드 방식: 두 가지 모두 수행 */ import db from "../database/db"; import { FlowAuditLog, FlowIntegrationContext, FlowDefinition, } from "../types/flow"; import { FlowDefinitionService } from "./flowDefinitionService"; import { FlowStepService } from "./flowStepService"; import { FlowExternalDbIntegrationService } from "./flowExternalDbIntegrationService"; import { getExternalPool, executeExternalQuery, executeExternalTransaction, } from "./externalDbHelper"; import { getPlaceholder, buildUpdateQuery, buildInsertQuery, buildSelectQuery, } from "./dbQueryBuilder"; export class FlowDataMoveService { private flowDefinitionService: FlowDefinitionService; private flowStepService: FlowStepService; private externalDbIntegrationService: FlowExternalDbIntegrationService; constructor() { this.flowDefinitionService = new FlowDefinitionService(); this.flowStepService = new FlowStepService(); this.externalDbIntegrationService = new FlowExternalDbIntegrationService(); } /** * 데이터를 다음 플로우 단계로 이동 (하이브리드 지원) */ async moveDataToStep( flowId: number, fromStepId: number, toStepId: number, dataId: any, userId: string = "system", additionalData?: Record ): Promise<{ success: boolean; targetDataId?: any; message?: string }> { // 0. 플로우 정의 조회 (DB 소스 확인) const flowDefinition = await this.flowDefinitionService.findById(flowId); if (!flowDefinition) { throw new Error(`플로우를 찾을 수 없습니다 (ID: ${flowId})`); } // 외부 DB인 경우 별도 처리 if ( flowDefinition.dbSourceType === "external" && flowDefinition.dbConnectionId ) { return await this.moveDataToStepExternal( flowDefinition.dbConnectionId, fromStepId, toStepId, dataId, userId, additionalData ); } // 내부 DB 처리 (기존 로직) return await db.transaction(async (client) => { try { // 1. 단계 정보 조회 const fromStep = await this.flowStepService.findById(fromStepId); const toStep = await this.flowStepService.findById(toStepId); if (!fromStep || !toStep) { throw new Error("유효하지 않은 단계입니다"); } let targetDataId = dataId; let sourceTable = fromStep.tableName; let targetTable = toStep.tableName || fromStep.tableName; // 2. 이동 방식에 따라 처리 switch (toStep.moveType || "status") { case "status": // 상태 변경 방식 await this.moveByStatusChange( client, fromStep, toStep, dataId, additionalData ); break; case "table": // 테이블 이동 방식 targetDataId = await this.moveByTableTransfer( client, fromStep, toStep, dataId, additionalData ); targetTable = toStep.targetTable || toStep.tableName; break; case "both": // 하이브리드 방식: 둘 다 수행 await this.moveByStatusChange( client, fromStep, toStep, dataId, additionalData ); targetDataId = await this.moveByTableTransfer( client, fromStep, toStep, dataId, additionalData ); targetTable = toStep.targetTable || toStep.tableName; break; default: throw new Error(`지원하지 않는 이동 방식: ${toStep.moveType}`); } // 3. 매핑 테이블 업데이트 (테이블 이동 방식일 때) if (toStep.moveType === "table" || toStep.moveType === "both") { await this.updateDataMapping( client, flowId, toStepId, fromStepId, dataId, targetDataId ); } // 4. 외부 DB 연동 실행 (설정된 경우) if ( toStep.integrationType && toStep.integrationType !== "internal" && toStep.integrationConfig ) { await this.executeExternalIntegration( toStep, flowId, targetDataId, sourceTable, userId, additionalData ); } // 5. 감사 로그 기록 let dbConnectionName = null; if ( flowDefinition.dbSourceType === "external" && flowDefinition.dbConnectionId ) { // 외부 DB인 경우 연결 이름 조회 try { const connResult = await client.query( `SELECT connection_name FROM external_db_connections WHERE id = $1`, [flowDefinition.dbConnectionId] ); if (connResult.rows && connResult.rows.length > 0) { dbConnectionName = connResult.rows[0].connection_name; } } catch (error) { console.warn("외부 DB 연결 이름 조회 실패:", error); } } else { // 내부 DB인 경우 dbConnectionName = "내부 데이터베이스"; } await this.logDataMove(client, { flowId, fromStepId, toStepId, moveType: toStep.moveType || "status", sourceTable, targetTable, sourceDataId: String(dataId), targetDataId: String(targetDataId), statusFrom: fromStep.statusValue, statusTo: toStep.statusValue, userId, dbConnectionId: flowDefinition.dbSourceType === "external" ? flowDefinition.dbConnectionId : null, dbConnectionName, }); return { success: true, targetDataId, message: "데이터가 성공적으로 이동되었습니다", }; } catch (error: any) { console.error("데이터 이동 실패:", error); throw error; } }); } /** * 상태 변경 방식으로 데이터 이동 */ private async moveByStatusChange( client: any, fromStep: any, toStep: any, dataId: any, additionalData?: Record ): Promise { // 상태 컬럼이 지정되지 않은 경우 에러 if (!toStep.statusColumn) { throw new Error( `단계 "${toStep.stepName}"의 상태 컬럼이 지정되지 않았습니다. 플로우 편집 화면에서 "상태 컬럼명"을 설정해주세요.` ); } const statusColumn = toStep.statusColumn; const tableName = fromStep.tableName; // 추가 필드 업데이트 준비 const updates: string[] = [`${statusColumn} = $2`, `updated_at = NOW()`]; const values: any[] = [dataId, toStep.statusValue]; let paramIndex = 3; // 추가 데이터가 있으면 함께 업데이트 if (additionalData) { for (const [key, value] of Object.entries(additionalData)) { updates.push(`${key} = $${paramIndex}`); values.push(value); paramIndex++; } } const updateQuery = ` UPDATE ${tableName} SET ${updates.join(", ")} WHERE id = $1 `; const result = await client.query(updateQuery, values); if (result.rowCount === 0) { throw new Error(`데이터를 찾을 수 없습니다: ${dataId}`); } } /** * 테이블 이동 방식으로 데이터 이동 */ private async moveByTableTransfer( client: any, fromStep: any, toStep: any, dataId: any, additionalData?: Record ): Promise { const sourceTable = fromStep.tableName; const targetTable = toStep.targetTable || toStep.tableName; const fieldMappings = toStep.fieldMappings || {}; // 1. 소스 데이터 조회 const selectQuery = `SELECT * FROM ${sourceTable} WHERE id = $1`; const sourceResult = await client.query(selectQuery, [dataId]); if (sourceResult.length === 0) { throw new Error(`소스 데이터를 찾을 수 없습니다: ${dataId}`); } const sourceData = sourceResult[0]; // 2. 필드 매핑 적용 const mappedData: Record = {}; // 매핑 정의가 있으면 적용 for (const [sourceField, targetField] of Object.entries(fieldMappings)) { if (sourceData[sourceField] !== undefined) { mappedData[targetField as string] = sourceData[sourceField]; } } // 추가 데이터 병합 if (additionalData) { Object.assign(mappedData, additionalData); } // 3. 타겟 테이블에 데이터 삽입 if (Object.keys(mappedData).length === 0) { throw new Error("매핑할 데이터가 없습니다"); } const columns = Object.keys(mappedData); const values = Object.values(mappedData); const placeholders = columns.map((_, i) => `$${i + 1}`).join(", "); const insertQuery = ` INSERT INTO ${targetTable} (${columns.join(", ")}) VALUES (${placeholders}) RETURNING id `; const insertResult = await client.query(insertQuery, values); return insertResult[0].id; } /** * 데이터 매핑 테이블 업데이트 */ private async updateDataMapping( client: any, flowId: number, currentStepId: number, prevStepId: number, sourceDataId: any, targetDataId: any ): Promise { // 기존 매핑 조회 const selectQuery = ` SELECT id, step_data_map FROM flow_data_mapping WHERE flow_definition_id = $1 AND step_data_map->$2 = $3 `; const mappingResult = await client.query(selectQuery, [ flowId, String(prevStepId), JSON.stringify(String(sourceDataId)), ]); const stepDataMap: Record = mappingResult.length > 0 ? mappingResult[0].step_data_map : {}; // 새 단계 데이터 추가 stepDataMap[String(currentStepId)] = String(targetDataId); if (mappingResult.length > 0) { // 기존 매핑 업데이트 const updateQuery = ` UPDATE flow_data_mapping SET current_step_id = $1, step_data_map = $2, updated_at = NOW() WHERE id = $3 `; await client.query(updateQuery, [ currentStepId, JSON.stringify(stepDataMap), mappingResult[0].id, ]); } else { // 새 매핑 생성 const insertQuery = ` INSERT INTO flow_data_mapping (flow_definition_id, current_step_id, step_data_map) VALUES ($1, $2, $3) `; await client.query(insertQuery, [ flowId, currentStepId, JSON.stringify(stepDataMap), ]); } } /** * 감사 로그 기록 */ private async logDataMove(client: any, params: any): Promise { const query = ` INSERT INTO flow_audit_log ( flow_definition_id, from_step_id, to_step_id, move_type, source_table, target_table, source_data_id, target_data_id, status_from, status_to, changed_by, note, db_connection_id, db_connection_name ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) `; await client.query(query, [ params.flowId, params.fromStepId, params.toStepId, params.moveType, params.sourceTable, params.targetTable, params.sourceDataId, params.targetDataId, params.statusFrom, params.statusTo, params.userId, params.note || null, params.dbConnectionId || null, params.dbConnectionName || null, ]); } /** * 여러 데이터를 동시에 다음 단계로 이동 */ async moveBatchData( flowId: number, fromStepId: number, toStepId: number, dataIds: any[], userId: string = "system" ): Promise<{ success: boolean; results: any[] }> { const results = []; for (const dataId of dataIds) { try { const result = await this.moveDataToStep( flowId, fromStepId, toStepId, dataId, userId ); results.push({ dataId, ...result }); } catch (error: any) { results.push({ dataId, success: false, message: error.message }); } } return { success: results.every((r) => r.success), results, }; } /** * 데이터의 플로우 이력 조회 */ async getAuditLogs(flowId: number, dataId: string): Promise { const query = ` SELECT fal.*, fs_from.step_name as from_step_name, fs_to.step_name as to_step_name FROM flow_audit_log fal LEFT JOIN flow_step fs_from ON fal.from_step_id = fs_from.id LEFT JOIN flow_step fs_to ON fal.to_step_id = fs_to.id WHERE fal.flow_definition_id = $1 AND (fal.source_data_id = $2 OR fal.target_data_id = $2) ORDER BY fal.changed_at DESC `; const result = await db.query(query, [flowId, dataId]); return result.map((row) => ({ id: row.id, flowDefinitionId: row.flow_definition_id, tableName: row.table_name || row.source_table, recordId: row.record_id || row.source_data_id, fromStepId: row.from_step_id, toStepId: row.to_step_id, changedBy: row.changed_by, changedAt: row.changed_at, note: row.note, fromStepName: row.from_step_name, toStepName: row.to_step_name, moveType: row.move_type, sourceTable: row.source_table, targetTable: row.target_table, sourceDataId: row.source_data_id, targetDataId: row.target_data_id, statusFrom: row.status_from, statusTo: row.status_to, dbConnectionId: row.db_connection_id, dbConnectionName: row.db_connection_name, })); } /** * 특정 플로우의 모든 이력 조회 */ async getFlowAuditLogs( flowId: number, limit: number = 100 ): Promise { const query = ` SELECT fal.*, fs_from.step_name as from_step_name, fs_to.step_name as to_step_name FROM flow_audit_log fal LEFT JOIN flow_step fs_from ON fal.from_step_id = fs_from.id LEFT JOIN flow_step fs_to ON fal.to_step_id = fs_to.id WHERE fal.flow_definition_id = $1 ORDER BY fal.changed_at DESC LIMIT $2 `; const result = await db.query(query, [flowId, limit]); return result.map((row) => ({ id: row.id, flowDefinitionId: row.flow_definition_id, tableName: row.table_name || row.source_table, recordId: row.record_id || row.source_data_id, fromStepId: row.from_step_id, toStepId: row.to_step_id, changedBy: row.changed_by, changedAt: row.changed_at, note: row.note, fromStepName: row.from_step_name, toStepName: row.to_step_name, moveType: row.move_type, sourceTable: row.source_table, targetTable: row.target_table, sourceDataId: row.source_data_id, targetDataId: row.target_data_id, statusFrom: row.status_from, statusTo: row.status_to, dbConnectionId: row.db_connection_id, dbConnectionName: row.db_connection_name, })); } /** * 외부 DB 연동 실행 */ private async executeExternalIntegration( toStep: any, flowId: number, dataId: any, tableName: string | undefined, userId: string, additionalData?: Record ): Promise { const startTime = Date.now(); try { // 연동 컨텍스트 구성 const context: FlowIntegrationContext = { flowId, stepId: toStep.id, dataId, tableName, currentUser: userId, variables: { ...additionalData, stepName: toStep.stepName, stepId: toStep.id, }, }; // 연동 타입별 처리 switch (toStep.integrationType) { case "external_db": const result = await this.externalDbIntegrationService.execute( context, toStep.integrationConfig ); // 연동 로그 기록 await this.logIntegration( flowId, toStep.id, dataId, toStep.integrationType, toStep.integrationConfig.connectionId, toStep.integrationConfig, result.data, result.success ? "success" : "failed", result.error?.message, Date.now() - startTime, userId ); if (!result.success) { throw new Error( `외부 DB 연동 실패: ${result.error?.message || "알 수 없는 오류"}` ); } break; case "rest_api": // REST API 연동 (추후 구현) console.warn("REST API 연동은 아직 구현되지 않았습니다"); break; case "webhook": // Webhook 연동 (추후 구현) console.warn("Webhook 연동은 아직 구현되지 않았습니다"); break; case "hybrid": // 복합 연동 (추후 구현) console.warn("복합 연동은 아직 구현되지 않았습니다"); break; default: throw new Error(`지원하지 않는 연동 타입: ${toStep.integrationType}`); } } catch (error: any) { console.error("외부 연동 실행 실패:", error); // 연동 실패 로그 기록 await this.logIntegration( flowId, toStep.id, dataId, toStep.integrationType, toStep.integrationConfig?.connectionId, toStep.integrationConfig, null, "failed", error.message, Date.now() - startTime, userId ); throw error; } } /** * 외부 연동 로그 기록 */ private async logIntegration( flowId: number, stepId: number, dataId: any, integrationType: string, connectionId: number | undefined, requestPayload: any, responsePayload: any, status: "success" | "failed" | "timeout" | "rollback", errorMessage: string | undefined, executionTimeMs: number, userId: string ): Promise { const query = ` INSERT INTO flow_integration_log ( flow_definition_id, step_id, data_id, integration_type, connection_id, request_payload, response_payload, status, error_message, execution_time_ms, executed_by, executed_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) `; await db.query(query, [ flowId, stepId, String(dataId), integrationType, connectionId || null, requestPayload ? JSON.stringify(requestPayload) : null, responsePayload ? JSON.stringify(responsePayload) : null, status, errorMessage || null, executionTimeMs, userId, ]); } /** * 외부 DB 데이터 이동 처리 */ private async moveDataToStepExternal( dbConnectionId: number, fromStepId: number, toStepId: number, dataId: any, userId: string = "system", additionalData?: Record ): Promise<{ success: boolean; targetDataId?: any; message?: string }> { return await executeExternalTransaction( dbConnectionId, async (externalClient, dbType) => { try { // 1. 단계 정보 조회 (내부 DB에서) const fromStep = await this.flowStepService.findById(fromStepId); const toStep = await this.flowStepService.findById(toStepId); if (!fromStep || !toStep) { throw new Error("유효하지 않은 단계입니다"); } let targetDataId = dataId; let sourceTable = fromStep.tableName; let targetTable = toStep.tableName || fromStep.tableName; // 2. 이동 방식에 따라 처리 switch (toStep.moveType || "status") { case "status": // 상태 변경 방식 await this.moveByStatusChangeExternal( externalClient, dbType, fromStep, toStep, dataId, additionalData ); break; case "table": // 테이블 이동 방식 targetDataId = await this.moveByTableTransferExternal( externalClient, dbType, fromStep, toStep, dataId, additionalData ); targetTable = toStep.targetTable || toStep.tableName; break; case "both": // 하이브리드 방식: 둘 다 수행 await this.moveByStatusChangeExternal( externalClient, dbType, fromStep, toStep, dataId, additionalData ); targetDataId = await this.moveByTableTransferExternal( externalClient, dbType, fromStep, toStep, dataId, additionalData ); targetTable = toStep.targetTable || toStep.tableName; break; default: throw new Error( `지원하지 않는 이동 방식입니다: ${toStep.moveType}` ); } // 3. 외부 연동 처리는 생략 (외부 DB 자체가 외부이므로) // 4. 외부 DB 연결 이름 조회 let dbConnectionName = null; try { const connResult = await db.query( `SELECT connection_name FROM external_db_connections WHERE id = $1`, [dbConnectionId] ); if (connResult.length > 0) { dbConnectionName = connResult[0].connection_name; } } catch (error) { console.warn("외부 DB 연결 이름 조회 실패:", error); } // 5. 감사 로그 기록 (내부 DB에) // 외부 DB는 내부 DB 트랜잭션 외부이므로 직접 쿼리 실행 const auditQuery = ` INSERT INTO flow_audit_log ( flow_definition_id, from_step_id, to_step_id, move_type, source_table, target_table, source_data_id, target_data_id, status_from, status_to, changed_by, note, db_connection_id, db_connection_name ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) `; await db.query(auditQuery, [ toStep.flowDefinitionId, fromStep.id, toStep.id, toStep.moveType || "status", sourceTable, targetTable, dataId, targetDataId, null, // statusFrom toStep.statusValue || null, // statusTo userId, `외부 DB (${dbType}) 데이터 이동`, dbConnectionId, dbConnectionName, ]); return { success: true, targetDataId, message: `데이터 이동이 완료되었습니다 (외부 DB: ${dbType})`, }; } catch (error: any) { console.error("외부 DB 데이터 이동 오류:", error); throw error; } } ); } /** * 외부 DB 상태 변경 방식으로 데이터 이동 */ private async moveByStatusChangeExternal( externalClient: any, dbType: string, fromStep: any, toStep: any, dataId: any, additionalData?: Record ): Promise { // 상태 컬럼이 지정되지 않은 경우 에러 if (!toStep.statusColumn) { throw new Error( `단계 "${toStep.stepName}"의 상태 컬럼이 지정되지 않았습니다. 플로우 편집 화면에서 "상태 컬럼명"을 설정해주세요.` ); } const statusColumn = toStep.statusColumn; const tableName = fromStep.tableName; const normalizedDbType = dbType.toLowerCase(); // 업데이트할 필드 준비 const updateFields: { column: string; value: any }[] = [ { column: statusColumn, value: toStep.statusValue }, ]; // 추가 데이터가 있으면 함께 업데이트 if (additionalData) { for (const [key, value] of Object.entries(additionalData)) { updateFields.push({ column: key, value }); } } // DB별 쿼리 생성 const { query: updateQuery, values } = buildUpdateQuery( dbType, tableName, updateFields, "id" ); // WHERE 절 값 설정 (마지막 파라미터) values[values.length - 1] = dataId; // 쿼리 실행 (DB 타입별 처리) let result: any; if (normalizedDbType === "postgresql") { result = await externalClient.query(updateQuery, values); } else if (normalizedDbType === "mysql" || normalizedDbType === "mariadb") { [result] = await externalClient.query(updateQuery, values); } else if (normalizedDbType === "mssql") { const request = externalClient.request(); values.forEach((val: any, idx: number) => { request.input(`p${idx + 1}`, val); }); result = await request.query(updateQuery); } else if (normalizedDbType === "oracle") { result = await externalClient.execute(updateQuery, values, { autoCommit: false, }); } // 결과 확인 const affectedRows = normalizedDbType === "postgresql" ? result.rowCount : normalizedDbType === "mssql" ? result.rowsAffected[0] : normalizedDbType === "oracle" ? result.rowsAffected : result.affectedRows; if (affectedRows === 0) { throw new Error(`데이터를 찾을 수 없습니다: ${dataId}`); } } /** * 외부 DB 테이블 이동 방식으로 데이터 이동 */ private async moveByTableTransferExternal( externalClient: any, dbType: string, fromStep: any, toStep: any, dataId: any, additionalData?: Record ): Promise { const sourceTable = fromStep.tableName; const targetTable = toStep.targetTable || toStep.tableName; const fieldMappings = toStep.fieldMappings || {}; const normalizedDbType = dbType.toLowerCase(); // 1. 소스 데이터 조회 const { query: selectQuery, placeholder } = buildSelectQuery( dbType, sourceTable, "id" ); let sourceResult: any; if (normalizedDbType === "postgresql") { sourceResult = await externalClient.query(selectQuery, [dataId]); } else if (normalizedDbType === "mysql" || normalizedDbType === "mariadb") { [sourceResult] = await externalClient.query(selectQuery, [dataId]); } else if (normalizedDbType === "mssql") { const request = externalClient.request(); request.input("p1", dataId); sourceResult = await request.query(selectQuery); sourceResult = { rows: sourceResult.recordset }; } else if (normalizedDbType === "oracle") { sourceResult = await externalClient.execute(selectQuery, [dataId], { autoCommit: false, outFormat: 4001, // oracledb.OUT_FORMAT_OBJECT }); } const rows = sourceResult.rows || sourceResult; if (!rows || rows.length === 0) { throw new Error(`소스 데이터를 찾을 수 없습니다: ${dataId}`); } const sourceData = rows[0]; // 2. 필드 매핑 적용 const targetData: Record = {}; for (const [targetField, sourceField] of Object.entries(fieldMappings)) { const sourceFieldKey = sourceField as string; if (sourceData[sourceFieldKey] !== undefined) { targetData[targetField] = sourceData[sourceFieldKey]; } } // 추가 데이터 병합 if (additionalData) { Object.assign(targetData, additionalData); } // 3. 대상 테이블에 삽입 const { query: insertQuery, values } = buildInsertQuery( dbType, targetTable, targetData ); let insertResult: any; let newDataId: any; if (normalizedDbType === "postgresql") { insertResult = await externalClient.query(insertQuery, values); newDataId = insertResult.rows[0].id; } else if (normalizedDbType === "mysql" || normalizedDbType === "mariadb") { [insertResult] = await externalClient.query(insertQuery, values); newDataId = insertResult.insertId; } else if (normalizedDbType === "mssql") { const request = externalClient.request(); values.forEach((val: any, idx: number) => { request.input(`p${idx + 1}`, val); }); insertResult = await request.query(insertQuery); newDataId = insertResult.recordset[0].id; } else if (normalizedDbType === "oracle") { // Oracle RETURNING 절 처리 const outBinds: any = { id: { dir: 3003, type: 2001 } }; // OUT, NUMBER insertResult = await externalClient.execute(insertQuery, values, { autoCommit: false, outBinds: outBinds, }); newDataId = insertResult.outBinds.id[0]; } // 4. 필요 시 소스 데이터 삭제 (옵션) // const deletePlaceholder = getPlaceholder(dbType, 1); // await externalClient.query(`DELETE FROM ${sourceTable} WHERE id = ${deletePlaceholder}`, [dataId]); return newDataId; } }