import { FlowExternalDbIntegrationConfig, FlowIntegrationContext, FlowIntegrationResult, } from "../types/flow"; import { FlowExternalDbConnectionService } from "./flowExternalDbConnectionService"; import { Pool } from "pg"; /** * 플로우 외부 DB 연동 실행 서비스 * 외부 데이터베이스에 대한 작업(INSERT, UPDATE, DELETE, CUSTOM QUERY) 수행 */ export class FlowExternalDbIntegrationService { private connectionService: FlowExternalDbConnectionService; constructor() { this.connectionService = new FlowExternalDbConnectionService(); } /** * 외부 DB 연동 실행 */ async execute( context: FlowIntegrationContext, config: FlowExternalDbIntegrationConfig ): Promise { const startTime = Date.now(); try { // 1. 연결 정보 조회 const connection = await this.connectionService.findById( config.connectionId ); if (!connection) { return { success: false, error: { code: "CONNECTION_NOT_FOUND", message: `외부 DB 연결 정보를 찾을 수 없습니다 (ID: ${config.connectionId})`, }, }; } if (!connection.isActive) { return { success: false, error: { code: "CONNECTION_INACTIVE", message: `외부 DB 연결이 비활성화 상태입니다 (${connection.name})`, }, }; } // 2. 쿼리 생성 (템플릿 변수 치환) const query = this.buildQuery(config, context); // 3. 실행 const pool = await this.connectionService.getConnectionPool(connection); const result = await this.executeQuery(pool, query); const executionTime = Date.now() - startTime; return { success: true, message: `외부 DB 작업 성공 (${config.operation}, ${executionTime}ms)`, data: result, rollbackInfo: { query: this.buildRollbackQuery(config, context, result), connectionId: config.connectionId, }, }; } catch (error: any) { const executionTime = Date.now() - startTime; return { success: false, error: { code: "EXTERNAL_DB_ERROR", message: error.message || "외부 DB 작업 실패", details: { operation: config.operation, tableName: config.tableName, executionTime, originalError: error, }, }, }; } } /** * 쿼리 실행 */ private async executeQuery( pool: Pool, query: { sql: string; params: any[] } ): Promise { const client = await pool.connect(); try { const result = await client.query(query.sql, query.params); return result.rows; } finally { client.release(); } } /** * 쿼리 빌드 (템플릿 변수 치환 포함) */ private buildQuery( config: FlowExternalDbIntegrationConfig, context: FlowIntegrationContext ): { sql: string; params: any[] } { let sql = ""; const params: any[] = []; let paramIndex = 1; switch (config.operation) { case "update": return this.buildUpdateQuery(config, context, paramIndex); case "insert": return this.buildInsertQuery(config, context, paramIndex); case "delete": return this.buildDeleteQuery(config, context, paramIndex); case "custom": return this.buildCustomQuery(config, context); default: throw new Error(`지원하지 않는 작업: ${config.operation}`); } } /** * UPDATE 쿼리 빌드 */ private buildUpdateQuery( config: FlowExternalDbIntegrationConfig, context: FlowIntegrationContext, startIndex: number ): { sql: string; params: any[] } { if (!config.updateFields || Object.keys(config.updateFields).length === 0) { throw new Error("UPDATE 작업에는 updateFields가 필요합니다"); } if ( !config.whereCondition || Object.keys(config.whereCondition).length === 0 ) { throw new Error("UPDATE 작업에는 whereCondition이 필요합니다"); } const setClauses: string[] = []; const params: any[] = []; let paramIndex = startIndex; // SET 절 생성 for (const [key, value] of Object.entries(config.updateFields)) { setClauses.push(`${key} = $${paramIndex}`); params.push(this.replaceVariables(value, context)); paramIndex++; } // WHERE 절 생성 const whereClauses: string[] = []; for (const [key, value] of Object.entries(config.whereCondition)) { whereClauses.push(`${key} = $${paramIndex}`); params.push(this.replaceVariables(value, context)); paramIndex++; } const sql = `UPDATE ${config.tableName} SET ${setClauses.join(", ")} WHERE ${whereClauses.join(" AND ")}`; return { sql, params }; } /** * INSERT 쿼리 빌드 */ private buildInsertQuery( config: FlowExternalDbIntegrationConfig, context: FlowIntegrationContext, startIndex: number ): { sql: string; params: any[] } { if (!config.updateFields || Object.keys(config.updateFields).length === 0) { throw new Error("INSERT 작업에는 updateFields가 필요합니다"); } const columns: string[] = []; const placeholders: string[] = []; const params: any[] = []; let paramIndex = startIndex; for (const [key, value] of Object.entries(config.updateFields)) { columns.push(key); placeholders.push(`$${paramIndex}`); params.push(this.replaceVariables(value, context)); paramIndex++; } const sql = `INSERT INTO ${config.tableName} (${columns.join(", ")}) VALUES (${placeholders.join(", ")}) RETURNING *`; return { sql, params }; } /** * DELETE 쿼리 빌드 */ private buildDeleteQuery( config: FlowExternalDbIntegrationConfig, context: FlowIntegrationContext, startIndex: number ): { sql: string; params: any[] } { if ( !config.whereCondition || Object.keys(config.whereCondition).length === 0 ) { throw new Error("DELETE 작업에는 whereCondition이 필요합니다"); } const whereClauses: string[] = []; const params: any[] = []; let paramIndex = startIndex; for (const [key, value] of Object.entries(config.whereCondition)) { whereClauses.push(`${key} = $${paramIndex}`); params.push(this.replaceVariables(value, context)); paramIndex++; } const sql = `DELETE FROM ${config.tableName} WHERE ${whereClauses.join(" AND ")}`; return { sql, params }; } /** * CUSTOM 쿼리 빌드 */ private buildCustomQuery( config: FlowExternalDbIntegrationConfig, context: FlowIntegrationContext ): { sql: string; params: any[] } { if (!config.customQuery) { throw new Error("CUSTOM 작업에는 customQuery가 필요합니다"); } // 템플릿 변수 치환 const sql = this.replaceVariables(config.customQuery, context); // 커스텀 쿼리는 파라미터를 직접 관리 // 보안을 위해 가능하면 파라미터 바인딩 사용 권장 return { sql, params: [] }; } /** * 템플릿 변수 치환 */ private replaceVariables(value: any, context: FlowIntegrationContext): any { if (typeof value !== "string") { return value; } let result = value; // {{dataId}} 치환 result = result.replace(/\{\{dataId\}\}/g, String(context.dataId)); // {{currentUser}} 치환 result = result.replace(/\{\{currentUser\}\}/g, context.currentUser); // {{currentTimestamp}} 치환 result = result.replace( /\{\{currentTimestamp\}\}/g, new Date().toISOString() ); // {{flowId}} 치환 result = result.replace(/\{\{flowId\}\}/g, String(context.flowId)); // {{stepId}} 치환 result = result.replace(/\{\{stepId\}\}/g, String(context.stepId)); // {{tableName}} 치환 if (context.tableName) { result = result.replace(/\{\{tableName\}\}/g, context.tableName); } // context.variables의 커스텀 변수 치환 for (const [key, val] of Object.entries(context.variables)) { const regex = new RegExp(`\\{\\{${key}\\}\\}`, "g"); result = result.replace(regex, String(val)); } // NOW() 같은 SQL 함수는 그대로 반환 if (result === "NOW()" || result.startsWith("CURRENT_")) { return result; } return result; } /** * 롤백 쿼리 생성 */ private buildRollbackQuery( config: FlowExternalDbIntegrationConfig, context: FlowIntegrationContext, result: any ): { sql: string; params: any[] } | null { // 롤백 쿼리 생성 로직 (복잡하므로 실제 구현 시 상세 설계 필요) // 예: INSERT -> DELETE, UPDATE -> 이전 값으로 UPDATE switch (config.operation) { case "insert": // INSERT를 롤백하려면 삽입된 레코드를 DELETE if (result && result[0] && result[0].id) { return { sql: `DELETE FROM ${config.tableName} WHERE id = $1`, params: [result[0].id], }; } break; case "delete": // DELETE 롤백은 매우 어려움 (원본 데이터 필요) console.warn("DELETE 작업의 롤백은 지원하지 않습니다"); break; case "update": // UPDATE 롤백을 위해서는 이전 값을 저장해야 함 console.warn("UPDATE 작업의 롤백은 현재 구현되지 않았습니다"); break; default: break; } return null; } /** * 롤백 실행 */ async rollback( connectionId: number, rollbackQuery: { sql: string; params: any[] } ): Promise { const connection = await this.connectionService.findById(connectionId); if (!connection) { throw new Error( `롤백 실패: 연결 정보를 찾을 수 없습니다 (ID: ${connectionId})` ); } const pool = await this.connectionService.getConnectionPool(connection); await this.executeQuery(pool, rollbackQuery); } }