diff --git a/backend-node/src/services/batchService.ts b/backend-node/src/services/batchService.ts index 80cd9064..19ceea52 100644 --- a/backend-node/src/services/batchService.ts +++ b/backend-node/src/services/batchService.ts @@ -1,7 +1,7 @@ // 배치관리 서비스 // 작성일: 2024-12-24 -import prisma from "../config/database"; +import { query, queryOne, transaction } from "../database/db"; import { BatchConfig, BatchMapping, @@ -26,51 +26,72 @@ export class BatchService { filter: BatchConfigFilter ): Promise> { try { - const where: any = {}; + const whereConditions: string[] = []; + const values: any[] = []; + let paramIndex = 1; // 필터 조건 적용 if (filter.is_active) { - where.is_active = filter.is_active; + whereConditions.push(`bc.is_active = $${paramIndex++}`); + values.push(filter.is_active); } if (filter.company_code) { - where.company_code = filter.company_code; + whereConditions.push(`bc.company_code = $${paramIndex++}`); + values.push(filter.company_code); } - // 검색 조건 적용 + // 검색 조건 적용 (OR) if (filter.search && filter.search.trim()) { - where.OR = [ - { - batch_name: { - contains: filter.search.trim(), - mode: "insensitive", - }, - }, - { - description: { - contains: filter.search.trim(), - mode: "insensitive", - }, - }, - ]; + whereConditions.push( + `(bc.batch_name ILIKE $${paramIndex} OR bc.description ILIKE $${paramIndex})` + ); + values.push(`%${filter.search.trim()}%`); + paramIndex++; } + const whereClause = + whereConditions.length > 0 + ? `WHERE ${whereConditions.join(" AND ")}` + : ""; + const page = filter.page || 1; const limit = filter.limit || 10; - const skip = (page - 1) * limit; + const offset = (page - 1) * limit; - const [batchConfigs, total] = await Promise.all([ - prisma.batch_configs.findMany({ - where, - include: { - batch_mappings: true, - }, - orderBy: [{ is_active: "desc" }, { batch_name: "asc" }], - skip, - take: limit, - }), - prisma.batch_configs.count({ where }), - ]); + // 배치 설정 조회 (매핑 포함 - 서브쿼리 사용) + const batchConfigs = await query( + `SELECT bc.*, + COALESCE( + json_agg( + json_build_object( + 'mapping_id', bm.mapping_id, + 'batch_id', bm.batch_id, + 'source_column', bm.source_column, + 'target_column', bm.target_column, + 'transformation_rule', bm.transformation_rule + ) + ) FILTER (WHERE bm.mapping_id IS NOT NULL), + '[]' + ) as batch_mappings + FROM batch_configs bc + LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id + ${whereClause} + GROUP BY bc.batch_id + ORDER BY bc.is_active DESC, bc.batch_name ASC + LIMIT $${paramIndex} OFFSET $${paramIndex + 1}`, + [...values, limit, offset] + ); + + // 전체 개수 조회 + const countResult = await queryOne<{ count: string }>( + `SELECT COUNT(DISTINCT bc.batch_id) as count + FROM batch_configs bc + ${whereClause}`, + values + ); + + const total = parseInt(countResult?.count || "0"); return { success: true, @@ -99,18 +120,32 @@ export class BatchService { id: number ): Promise> { try { - const batchConfig = await prisma.batch_configs.findUnique({ - where: { id }, - include: { - batch_mappings: { - orderBy: [ - { from_table_name: "asc" }, - { from_column_name: "asc" }, - { mapping_order: "asc" }, - ], - }, - }, - }); + const batchConfig = await queryOne( + `SELECT bc.*, + COALESCE( + json_agg( + json_build_object( + 'mapping_id', bm.mapping_id, + 'batch_id', bm.batch_id, + 'from_table_name', bm.from_table_name, + 'from_column_name', bm.from_column_name, + 'to_table_name', bm.to_table_name, + 'to_column_name', bm.to_column_name, + 'mapping_order', bm.mapping_order, + 'source_column', bm.source_column, + 'target_column', bm.target_column, + 'transformation_rule', bm.transformation_rule + ) + ORDER BY bm.from_table_name ASC, bm.from_column_name ASC, bm.mapping_order ASC + ) FILTER (WHERE bm.mapping_id IS NOT NULL), + '[]' + ) as batch_mappings + FROM batch_configs bc + LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id + WHERE bc.id = $1 + GROUP BY bc.batch_id`, + [id] + ); if (!batchConfig) { return { @@ -142,51 +177,66 @@ export class BatchService { ): Promise> { try { // 트랜잭션으로 배치 설정과 매핑 생성 - const result = await prisma.$transaction(async (tx) => { + const result = await transaction(async (client) => { // 배치 설정 생성 - const batchConfig = await tx.batch_configs.create({ - data: { - batch_name: data.batchName, - description: data.description, - cron_schedule: data.cronSchedule, - created_by: userId, - updated_by: userId, - }, - }); + const batchConfigResult = await client.query( + `INSERT INTO batch_configs + (batch_name, description, cron_schedule, created_by, updated_by, created_date, updated_date) + VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) + RETURNING *`, + [ + data.batchName, + data.description, + data.cronSchedule, + userId, + userId, + ] + ); + + const batchConfig = batchConfigResult.rows[0]; // 배치 매핑 생성 - const mappings = await Promise.all( - data.mappings.map((mapping, index) => - tx.batch_mappings.create({ - data: { - batch_config_id: batchConfig.id, - from_connection_type: mapping.from_connection_type, - from_connection_id: mapping.from_connection_id, - from_table_name: mapping.from_table_name, - from_column_name: mapping.from_column_name, - from_column_type: mapping.from_column_type, - from_api_url: mapping.from_api_url, - from_api_key: mapping.from_api_key, - from_api_method: mapping.from_api_method, - from_api_param_type: mapping.from_api_param_type, - from_api_param_name: mapping.from_api_param_name, - from_api_param_value: mapping.from_api_param_value, - from_api_param_source: mapping.from_api_param_source, - to_connection_type: mapping.to_connection_type, - to_connection_id: mapping.to_connection_id, - to_table_name: mapping.to_table_name, - to_column_name: mapping.to_column_name, - to_column_type: mapping.to_column_type, - to_api_url: mapping.to_api_url, - to_api_key: mapping.to_api_key, - to_api_method: mapping.to_api_method, - to_api_body: mapping.to_api_body, - mapping_order: mapping.mapping_order || index + 1, - created_by: userId, - }, - }) - ) - ); + const mappings = []; + for (let index = 0; index < data.mappings.length; index++) { + const mapping = data.mappings[index]; + const mappingResult = await client.query( + `INSERT INTO batch_mappings + (batch_config_id, from_connection_type, from_connection_id, from_table_name, from_column_name, + from_column_type, from_api_url, from_api_key, from_api_method, from_api_param_type, + from_api_param_name, from_api_param_value, from_api_param_source, + to_connection_type, to_connection_id, to_table_name, to_column_name, to_column_type, + to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, created_by, created_date) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, NOW()) + RETURNING *`, + [ + batchConfig.id, + mapping.from_connection_type, + mapping.from_connection_id, + mapping.from_table_name, + mapping.from_column_name, + mapping.from_column_type, + mapping.from_api_url, + mapping.from_api_key, + mapping.from_api_method, + mapping.from_api_param_type, + mapping.from_api_param_name, + mapping.from_api_param_value, + mapping.from_api_param_source, + mapping.to_connection_type, + mapping.to_connection_id, + mapping.to_table_name, + mapping.to_column_name, + mapping.to_column_type, + mapping.to_api_url, + mapping.to_api_key, + mapping.to_api_method, + mapping.to_api_body, + mapping.mapping_order || index + 1, + userId, + ] + ); + mappings.push(mappingResult.rows[0]); + } return { ...batchConfig, @@ -219,10 +269,23 @@ export class BatchService { ): Promise> { try { // 기존 배치 설정 확인 - const existingConfig = await prisma.batch_configs.findUnique({ - where: { id }, - include: { batch_mappings: true }, - }); + const existingConfig = await queryOne( + `SELECT bc.*, + COALESCE( + json_agg( + json_build_object( + 'mapping_id', bm.mapping_id, + 'batch_id', bm.batch_id + ) + ) FILTER (WHERE bm.mapping_id IS NOT NULL), + '[]' + ) as batch_mappings + FROM batch_configs bc + LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id + WHERE bc.id = $1 + GROUP BY bc.batch_id`, + [id] + ); if (!existingConfig) { return { @@ -232,61 +295,89 @@ export class BatchService { } // 트랜잭션으로 업데이트 - const result = await prisma.$transaction(async (tx) => { + const result = await transaction(async (client) => { + // 동적 UPDATE 쿼리 생성 + const updateFields: string[] = ["updated_by = $1", "updated_date = NOW()"]; + const updateValues: any[] = [userId]; + let paramIndex = 2; + + if (data.batchName) { + updateFields.push(`batch_name = $${paramIndex++}`); + updateValues.push(data.batchName); + } + if (data.description !== undefined) { + updateFields.push(`description = $${paramIndex++}`); + updateValues.push(data.description); + } + if (data.cronSchedule) { + updateFields.push(`cron_schedule = $${paramIndex++}`); + updateValues.push(data.cronSchedule); + } + if (data.isActive !== undefined) { + updateFields.push(`is_active = $${paramIndex++}`); + updateValues.push(data.isActive); + } + // 배치 설정 업데이트 - const updateData: any = { - updated_by: userId, - }; + const batchConfigResult = await client.query( + `UPDATE batch_configs + SET ${updateFields.join(", ")} + WHERE id = $${paramIndex} + RETURNING *`, + [...updateValues, id] + ); - if (data.batchName) updateData.batch_name = data.batchName; - if (data.description !== undefined) updateData.description = data.description; - if (data.cronSchedule) updateData.cron_schedule = data.cronSchedule; - if (data.isActive !== undefined) updateData.is_active = data.isActive; - - const batchConfig = await tx.batch_configs.update({ - where: { id }, - data: updateData, - }); + const batchConfig = batchConfigResult.rows[0]; // 매핑이 제공된 경우 기존 매핑 삭제 후 새로 생성 if (data.mappings) { - await tx.batch_mappings.deleteMany({ - where: { batch_config_id: id }, - }); - - const mappings = await Promise.all( - data.mappings.map((mapping, index) => - tx.batch_mappings.create({ - data: { - batch_config_id: id, - from_connection_type: mapping.from_connection_type, - from_connection_id: mapping.from_connection_id, - from_table_name: mapping.from_table_name, - from_column_name: mapping.from_column_name, - from_column_type: mapping.from_column_type, - from_api_url: mapping.from_api_url, - from_api_key: mapping.from_api_key, - from_api_method: mapping.from_api_method, - from_api_param_type: mapping.from_api_param_type, - from_api_param_name: mapping.from_api_param_name, - from_api_param_value: mapping.from_api_param_value, - from_api_param_source: mapping.from_api_param_source, - to_connection_type: mapping.to_connection_type, - to_connection_id: mapping.to_connection_id, - to_table_name: mapping.to_table_name, - to_column_name: mapping.to_column_name, - to_column_type: mapping.to_column_type, - to_api_url: mapping.to_api_url, - to_api_key: mapping.to_api_key, - to_api_method: mapping.to_api_method, - to_api_body: mapping.to_api_body, - mapping_order: mapping.mapping_order || index + 1, - created_by: userId, - }, - }) - ) + await client.query( + `DELETE FROM batch_mappings WHERE batch_config_id = $1`, + [id] ); + const mappings = []; + for (let index = 0; index < data.mappings.length; index++) { + const mapping = data.mappings[index]; + const mappingResult = await client.query( + `INSERT INTO batch_mappings + (batch_config_id, from_connection_type, from_connection_id, from_table_name, from_column_name, + from_column_type, from_api_url, from_api_key, from_api_method, from_api_param_type, + from_api_param_name, from_api_param_value, from_api_param_source, + to_connection_type, to_connection_id, to_table_name, to_column_name, to_column_type, + to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, created_by, created_date) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, NOW()) + RETURNING *`, + [ + id, + mapping.from_connection_type, + mapping.from_connection_id, + mapping.from_table_name, + mapping.from_column_name, + mapping.from_column_type, + mapping.from_api_url, + mapping.from_api_key, + mapping.from_api_method, + mapping.from_api_param_type, + mapping.from_api_param_name, + mapping.from_api_param_value, + mapping.from_api_param_source, + mapping.to_connection_type, + mapping.to_connection_id, + mapping.to_table_name, + mapping.to_column_name, + mapping.to_column_type, + mapping.to_api_url, + mapping.to_api_key, + mapping.to_api_method, + mapping.to_api_body, + mapping.mapping_order || index + 1, + userId, + ] + ); + mappings.push(mappingResult.rows[0]); + } + return { ...batchConfig, batch_mappings: mappings, @@ -322,9 +413,10 @@ export class BatchService { userId?: string ): Promise> { try { - const existingConfig = await prisma.batch_configs.findUnique({ - where: { id }, - }); + const existingConfig = await queryOne( + `SELECT * FROM batch_configs WHERE id = $1`, + [id] + ); if (!existingConfig) { return { @@ -333,14 +425,16 @@ export class BatchService { }; } - // 배치 매핑 먼저 삭제 (외래키 제약) - await prisma.batch_mappings.deleteMany({ - where: { batch_config_id: id } - }); + // 트랜잭션으로 삭제 + await transaction(async (client) => { + // 배치 매핑 먼저 삭제 (외래키 제약) + await client.query( + `DELETE FROM batch_mappings WHERE batch_config_id = $1`, + [id] + ); - // 배치 설정 삭제 - await prisma.batch_configs.delete({ - where: { id } + // 배치 설정 삭제 + await client.query(`DELETE FROM batch_configs WHERE id = $1`, [id]); }); return { @@ -360,24 +454,27 @@ export class BatchService { /** * 사용 가능한 커넥션 목록 조회 */ - static async getAvailableConnections(): Promise> { + static async getAvailableConnections(): Promise< + ApiResponse + > { try { const connections: ConnectionInfo[] = []; // 내부 DB 추가 connections.push({ - type: 'internal', - name: 'Internal Database', - db_type: 'postgresql', + type: "internal", + name: "Internal Database", + db_type: "postgresql", }); // 외부 DB 연결 조회 - const externalConnections = await BatchExternalDbService.getAvailableConnections(); + const externalConnections = + await BatchExternalDbService.getAvailableConnections(); if (externalConnections.success && externalConnections.data) { externalConnections.data.forEach((conn) => { connections.push({ - type: 'external', + type: "external", id: conn.id, name: conn.name, db_type: conn.db_type, @@ -403,28 +500,32 @@ export class BatchService { * 특정 커넥션의 테이블 목록 조회 */ static async getTablesFromConnection( - connectionType: 'internal' | 'external', + connectionType: "internal" | "external", connectionId?: number ): Promise> { try { let tables: TableInfo[] = []; - if (connectionType === 'internal') { + if (connectionType === "internal") { // 내부 DB 테이블 조회 - const result = await prisma.$queryRaw>` - SELECT table_name - FROM information_schema.tables - WHERE table_schema = 'public' - AND table_type = 'BASE TABLE' - ORDER BY table_name - `; - tables = result.map(row => ({ + const result = await query<{ table_name: string }>( + `SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_type = 'BASE TABLE' + ORDER BY table_name` + ); + tables = result.map((row) => ({ table_name: row.table_name, - columns: [] + columns: [], })); - } else if (connectionType === 'external' && connectionId) { + } else if (connectionType === "external" && connectionId) { // 외부 DB 테이블 조회 - const tablesResult = await BatchExternalDbService.getTablesFromConnection(connectionType, connectionId); + const tablesResult = + await BatchExternalDbService.getTablesFromConnection( + connectionType, + connectionId + ); if (tablesResult.success && tablesResult.data) { tables = tablesResult.data; } @@ -448,7 +549,7 @@ export class BatchService { * 특정 테이블의 컬럼 정보 조회 */ static async getTableColumns( - connectionType: 'internal' | 'external', + connectionType: "internal" | "external", connectionId: number | undefined, tableName: string ): Promise> { @@ -456,56 +557,59 @@ export class BatchService { console.log(`[BatchService] getTableColumns 호출:`, { connectionType, connectionId, - tableName + tableName, }); - + let columns: ColumnInfo[] = []; - - if (connectionType === 'internal') { + + if (connectionType === "internal") { // 내부 DB 컬럼 조회 console.log(`[BatchService] 내부 DB 컬럼 조회 시작: ${tableName}`); - - const result = await prisma.$queryRaw>` - SELECT - column_name, - data_type, - is_nullable, - column_default - FROM information_schema.columns - WHERE table_schema = 'public' - AND table_name = ${tableName} - ORDER BY ordinal_position - `; + }>( + `SELECT + column_name, + data_type, + is_nullable, + column_default + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = $1 + ORDER BY ordinal_position`, + [tableName] + ); console.log(`[BatchService] 내부 DB 컬럼 조회 결과:`, result); - columns = result.map(row => ({ + columns = result.map((row) => ({ column_name: row.column_name, data_type: row.data_type, is_nullable: row.is_nullable, column_default: row.column_default, })); - } else if (connectionType === 'external' && connectionId) { + } else if (connectionType === "external" && connectionId) { // 외부 DB 컬럼 조회 - console.log(`[BatchService] 외부 DB 컬럼 조회 시작: connectionId=${connectionId}, tableName=${tableName}`); - + console.log( + `[BatchService] 외부 DB 컬럼 조회 시작: connectionId=${connectionId}, tableName=${tableName}` + ); + const columnsResult = await BatchExternalDbService.getTableColumns( connectionType, connectionId, tableName ); - + console.log(`[BatchService] 외부 DB 컬럼 조회 결과:`, columnsResult); - + if (columnsResult.success && columnsResult.data) { columns = columnsResult.data; } - + console.log(`[BatchService] 외부 DB 컬럼:`, columns); } @@ -535,16 +639,20 @@ export class BatchService { failed_records: number; }): Promise { try { - const executionLog = await prisma.batch_execution_logs.create({ - data: { - batch_config_id: data.batch_config_id, - execution_status: data.execution_status, - start_time: data.start_time, - total_records: data.total_records, - success_records: data.success_records, - failed_records: data.failed_records, - }, - }); + const executionLog = await queryOne( + `INSERT INTO batch_execution_logs + (batch_config_id, execution_status, start_time, total_records, success_records, failed_records) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING *`, + [ + data.batch_config_id, + data.execution_status, + data.start_time, + data.total_records, + data.success_records, + data.failed_records, + ] + ); return executionLog; } catch (error) { @@ -569,10 +677,48 @@ export class BatchService { } ): Promise { try { - await prisma.batch_execution_logs.update({ - where: { id }, - data, - }); + // 동적 UPDATE 쿼리 생성 + const updateFields: string[] = []; + const values: any[] = []; + let paramIndex = 1; + + if (data.execution_status !== undefined) { + updateFields.push(`execution_status = $${paramIndex++}`); + values.push(data.execution_status); + } + if (data.end_time !== undefined) { + updateFields.push(`end_time = $${paramIndex++}`); + values.push(data.end_time); + } + if (data.duration_ms !== undefined) { + updateFields.push(`duration_ms = $${paramIndex++}`); + values.push(data.duration_ms); + } + if (data.total_records !== undefined) { + updateFields.push(`total_records = $${paramIndex++}`); + values.push(data.total_records); + } + if (data.success_records !== undefined) { + updateFields.push(`success_records = $${paramIndex++}`); + values.push(data.success_records); + } + if (data.failed_records !== undefined) { + updateFields.push(`failed_records = $${paramIndex++}`); + values.push(data.failed_records); + } + if (data.error_message !== undefined) { + updateFields.push(`error_message = $${paramIndex++}`); + values.push(data.error_message); + } + + if (updateFields.length > 0) { + await query( + `UPDATE batch_execution_logs + SET ${updateFields.join(", ")} + WHERE id = $${paramIndex}`, + [...values, id] + ); + } } catch (error) { console.error("배치 실행 로그 업데이트 오류:", error); throw error; @@ -584,29 +730,42 @@ export class BatchService { */ static async getDataFromTable( tableName: string, - connectionType: 'internal' | 'external' = 'internal', + connectionType: "internal" | "external" = "internal", connectionId?: number ): Promise { try { - console.log(`[BatchService] 테이블에서 데이터 조회: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ''})`); - - if (connectionType === 'internal') { - // 내부 DB에서 데이터 조회 - const result = await prisma.$queryRawUnsafe(`SELECT * FROM ${tableName} LIMIT 100`); - console.log(`[BatchService] 내부 DB 데이터 조회 결과: ${Array.isArray(result) ? result.length : 0}개 레코드`); - return result as any[]; - } else if (connectionType === 'external' && connectionId) { + console.log( + `[BatchService] 테이블에서 데이터 조회: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ""})` + ); + + if (connectionType === "internal") { + // 내부 DB에서 데이터 조회 (주의: SQL 인젝션 위험 - 실제 프로덕션에서는 테이블명 검증 필요) + const result = await query( + `SELECT * FROM ${tableName} LIMIT 100` + ); + console.log( + `[BatchService] 내부 DB 데이터 조회 결과: ${result.length}개 레코드` + ); + return result; + } else if (connectionType === "external" && connectionId) { // 외부 DB에서 데이터 조회 - const result = await BatchExternalDbService.getDataFromTable(connectionId, tableName); + const result = await BatchExternalDbService.getDataFromTable( + connectionId, + tableName + ); if (result.success && result.data) { - console.log(`[BatchService] 외부 DB 데이터 조회 결과: ${result.data.length}개 레코드`); + console.log( + `[BatchService] 외부 DB 데이터 조회 결과: ${result.data.length}개 레코드` + ); return result.data; } else { console.error(`외부 DB 데이터 조회 실패: ${result.message}`); return []; } } else { - throw new Error(`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`); + throw new Error( + `잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}` + ); } } catch (error) { console.error(`테이블 데이터 조회 오류 (${tableName}):`, error); @@ -620,30 +779,44 @@ export class BatchService { static async getDataFromTableWithColumns( tableName: string, columns: string[], - connectionType: 'internal' | 'external' = 'internal', + connectionType: "internal" | "external" = "internal", connectionId?: number ): Promise { try { - console.log(`[BatchService] 테이블에서 특정 컬럼 데이터 조회: ${tableName} (${columns.join(', ')}) (${connectionType}${connectionId ? `:${connectionId}` : ''})`); - - if (connectionType === 'internal') { - // 내부 DB에서 특정 컬럼만 조회 - const columnList = columns.join(', '); - const result = await prisma.$queryRawUnsafe(`SELECT ${columnList} FROM ${tableName} LIMIT 100`); - console.log(`[BatchService] 내부 DB 특정 컬럼 조회 결과: ${Array.isArray(result) ? result.length : 0}개 레코드`); - return result as any[]; - } else if (connectionType === 'external' && connectionId) { + console.log( + `[BatchService] 테이블에서 특정 컬럼 데이터 조회: ${tableName} (${columns.join(", ")}) (${connectionType}${connectionId ? `:${connectionId}` : ""})` + ); + + if (connectionType === "internal") { + // 내부 DB에서 특정 컬럼만 조회 (주의: SQL 인젝션 위험 - 실제 프로덕션에서는 테이블명/컬럼명 검증 필요) + const columnList = columns.join(", "); + const result = await query( + `SELECT ${columnList} FROM ${tableName} LIMIT 100` + ); + console.log( + `[BatchService] 내부 DB 특정 컬럼 조회 결과: ${result.length}개 레코드` + ); + return result; + } else if (connectionType === "external" && connectionId) { // 외부 DB에서 특정 컬럼만 조회 - const result = await BatchExternalDbService.getDataFromTableWithColumns(connectionId, tableName, columns); + const result = await BatchExternalDbService.getDataFromTableWithColumns( + connectionId, + tableName, + columns + ); if (result.success && result.data) { - console.log(`[BatchService] 외부 DB 특정 컬럼 조회 결과: ${result.data.length}개 레코드`); + console.log( + `[BatchService] 외부 DB 특정 컬럼 조회 결과: ${result.data.length}개 레코드` + ); return result.data; } else { console.error(`외부 DB 특정 컬럼 조회 실패: ${result.message}`); return []; } } else { - throw new Error(`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`); + throw new Error( + `잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}` + ); } } catch (error) { console.error(`테이블 특정 컬럼 조회 오류 (${tableName}):`, error); @@ -657,20 +830,22 @@ export class BatchService { static async insertDataToTable( tableName: string, data: any[], - connectionType: 'internal' | 'external' = 'internal', + connectionType: "internal" | "external" = "internal", connectionId?: number ): Promise<{ successCount: number; failedCount: number; }> { try { - console.log(`[BatchService] 테이블에 데이터 삽입: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ''}), ${data.length}개 레코드`); - + console.log( + `[BatchService] 테이블에 데이터 삽입: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ""}), ${data.length}개 레코드` + ); + if (!data || data.length === 0) { return { successCount: 0, failedCount: 0 }; } - if (connectionType === 'internal') { + if (connectionType === "internal") { // 내부 DB에 데이터 삽입 let successCount = 0; let failedCount = 0; @@ -680,99 +855,132 @@ export class BatchService { try { // 동적 UPSERT 쿼리 생성 (PostgreSQL ON CONFLICT 사용) const columns = Object.keys(record); - const values = Object.values(record).map(value => { + const values = Object.values(record).map((value) => { // Date 객체를 ISO 문자열로 변환 (PostgreSQL이 자동으로 파싱) if (value instanceof Date) { return value.toISOString(); } // JavaScript Date 문자열을 Date 객체로 변환 후 ISO 문자열로 - if (typeof value === 'string') { - const dateRegex = /^(Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{2}\s+\d{4}\s+\d{2}:\d{2}:\d{2}/; + if (typeof value === "string") { + const dateRegex = + /^(Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{2}\s+\d{4}\s+\d{2}:\d{2}:\d{2}/; if (dateRegex.test(value)) { return new Date(value).toISOString(); } // ISO 날짜 문자열 형식 체크 (2025-09-24T06:29:01.351Z) - const isoDateRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?Z?$/; + const isoDateRegex = + /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{3})?Z?$/; if (isoDateRegex.test(value)) { return new Date(value).toISOString(); } } return value; }); - + // PostgreSQL 타입 캐스팅을 위한 placeholder 생성 - const placeholders = columns.map((col, index) => { - // 날짜/시간 관련 컬럼명 패턴 체크 - if (col.toLowerCase().includes('date') || - col.toLowerCase().includes('time') || - col.toLowerCase().includes('created') || - col.toLowerCase().includes('updated') || - col.toLowerCase().includes('reg')) { - return `$${index + 1}::timestamp`; - } - return `$${index + 1}`; - }).join(', '); - + const placeholders = columns + .map((col, index) => { + // 날짜/시간 관련 컬럼명 패턴 체크 + if ( + col.toLowerCase().includes("date") || + col.toLowerCase().includes("time") || + col.toLowerCase().includes("created") || + col.toLowerCase().includes("updated") || + col.toLowerCase().includes("reg") + ) { + return `$${index + 1}::timestamp`; + } + return `$${index + 1}`; + }) + .join(", "); + // Primary Key 컬럼 추정 (일반적으로 id 또는 첫 번째 컬럼) - const primaryKeyColumn = columns.includes('id') ? 'id' : - columns.includes('user_id') ? 'user_id' : - columns[0]; - + const primaryKeyColumn = columns.includes("id") + ? "id" + : columns.includes("user_id") + ? "user_id" + : columns[0]; + // UPDATE SET 절 생성 (Primary Key 제외) - const updateColumns = columns.filter(col => col !== primaryKeyColumn); - const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', '); - + const updateColumns = columns.filter( + (col) => col !== primaryKeyColumn + ); + const updateSet = updateColumns + .map((col) => `${col} = EXCLUDED.${col}`) + .join(", "); + // 트랜잭션 내에서 처리하여 연결 관리 최적화 - const result = await prisma.$transaction(async (tx) => { + const result = await transaction(async (client) => { // 먼저 해당 레코드가 존재하는지 확인 const checkQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${primaryKeyColumn} = $1`; - const existsResult = await tx.$queryRawUnsafe(checkQuery, record[primaryKeyColumn]); - const exists = (existsResult as any)[0]?.count > 0; - - let operationResult = 'no_change'; - + const existsResult = await client.query( + checkQuery, + [record[primaryKeyColumn]] + ); + const exists = parseInt(existsResult.rows[0]?.count || "0") > 0; + + let operationResult = "no_change"; + if (exists && updateSet) { // 기존 레코드가 있으면 UPDATE (값이 다른 경우에만) - const whereConditions = updateColumns.map((col, index) => { - // 날짜/시간 컬럼에 대한 타입 캐스팅 처리 - if (col.toLowerCase().includes('date') || - col.toLowerCase().includes('time') || - col.toLowerCase().includes('created') || - col.toLowerCase().includes('updated') || - col.toLowerCase().includes('reg')) { - return `${col} IS DISTINCT FROM $${index + 2}::timestamp`; - } - return `${col} IS DISTINCT FROM $${index + 2}`; - }).join(' OR '); - - const query = `UPDATE ${tableName} SET ${updateSet.replace(/EXCLUDED\./g, '')} + const whereConditions = updateColumns + .map((col, index) => { + // 날짜/시간 컬럼에 대한 타입 캐스팅 처리 + if ( + col.toLowerCase().includes("date") || + col.toLowerCase().includes("time") || + col.toLowerCase().includes("created") || + col.toLowerCase().includes("updated") || + col.toLowerCase().includes("reg") + ) { + return `${col} IS DISTINCT FROM $${index + 2}::timestamp`; + } + return `${col} IS DISTINCT FROM $${index + 2}`; + }) + .join(" OR "); + + const query = `UPDATE ${tableName} SET ${updateSet.replace(/EXCLUDED\./g, "")} WHERE ${primaryKeyColumn} = $1 AND (${whereConditions})`; - + // 파라미터: [primaryKeyValue, ...updateValues] - const updateValues = [record[primaryKeyColumn], ...updateColumns.map(col => record[col])]; - const updateResult = await tx.$executeRawUnsafe(query, ...updateValues); - - if (updateResult > 0) { - console.log(`[BatchService] 레코드 업데이트: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); - operationResult = 'updated'; + const updateValues = [ + record[primaryKeyColumn], + ...updateColumns.map((col) => record[col]), + ]; + const updateResult = await client.query( + query, + updateValues + ); + + if (updateResult.rowCount && updateResult.rowCount > 0) { + console.log( + `[BatchService] 레코드 업데이트: ${primaryKeyColumn}=${record[primaryKeyColumn]}` + ); + operationResult = "updated"; } else { - console.log(`[BatchService] 레코드 변경사항 없음: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); - operationResult = 'no_change'; + console.log( + `[BatchService] 레코드 변경사항 없음: ${primaryKeyColumn}=${record[primaryKeyColumn]}` + ); + operationResult = "no_change"; } } else if (!exists) { // 새 레코드 삽입 - const query = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})`; - await tx.$executeRawUnsafe(query, ...values); - console.log(`[BatchService] 새 레코드 삽입: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); - operationResult = 'inserted'; + const query = `INSERT INTO ${tableName} (${columns.join(", ")}) VALUES (${placeholders})`; + await client.query(query, values); + console.log( + `[BatchService] 새 레코드 삽입: ${primaryKeyColumn}=${record[primaryKeyColumn]}` + ); + operationResult = "inserted"; } else { - console.log(`[BatchService] 레코드 이미 존재 (변경사항 없음): ${primaryKeyColumn}=${record[primaryKeyColumn]}`); - operationResult = 'no_change'; + console.log( + `[BatchService] 레코드 이미 존재 (변경사항 없음): ${primaryKeyColumn}=${record[primaryKeyColumn]}` + ); + operationResult = "no_change"; } - + return operationResult; }); - + successCount++; } catch (error) { console.error(`레코드 UPSERT 실패:`, error); @@ -780,21 +988,34 @@ export class BatchService { } } - console.log(`[BatchService] 내부 DB 데이터 삽입 완료: 성공 ${successCount}개, 실패 ${failedCount}개`); + console.log( + `[BatchService] 내부 DB 데이터 삽입 완료: 성공 ${successCount}개, 실패 ${failedCount}개` + ); return { successCount, failedCount }; - } else if (connectionType === 'external' && connectionId) { + } else if (connectionType === "external" && connectionId) { // 외부 DB에 데이터 삽입 - const result = await BatchExternalDbService.insertDataToTable(connectionId, tableName, data); + const result = await BatchExternalDbService.insertDataToTable( + connectionId, + tableName, + data + ); if (result.success && result.data) { - console.log(`[BatchService] 외부 DB 데이터 삽입 완료: 성공 ${result.data.successCount}개, 실패 ${result.data.failedCount}개`); + console.log( + `[BatchService] 외부 DB 데이터 삽입 완료: 성공 ${result.data.successCount}개, 실패 ${result.data.failedCount}개` + ); return result.data; } else { console.error(`외부 DB 데이터 삽입 실패: ${result.message}`); return { successCount: 0, failedCount: data.length }; } } else { - console.log(`[BatchService] 연결 정보 디버그:`, { connectionType, connectionId }); - throw new Error(`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`); + console.log(`[BatchService] 연결 정보 디버그:`, { + connectionType, + connectionId, + }); + throw new Error( + `잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}` + ); } } catch (error) { console.error(`테이블 데이터 삽입 오류 (${tableName}):`, error); @@ -818,10 +1039,10 @@ export class BatchService { // n:1 매핑 검사 (여러 FROM이 같은 TO로 매핑되는 것 방지) const toMappings = new Map(); - + mappings.forEach((mapping, index) => { - const toKey = `${mapping.to_connection_type}:${mapping.to_connection_id || 'internal'}:${mapping.to_table_name}:${mapping.to_column_name}`; - + const toKey = `${mapping.to_connection_type}:${mapping.to_connection_id || "internal"}:${mapping.to_table_name}:${mapping.to_column_name}`; + if (toMappings.has(toKey)) { errors.push( `매핑 ${index + 1}: TO 컬럼 '${mapping.to_table_name}.${mapping.to_column_name}'에 중복 매핑이 있습니다. n:1 매핑은 허용되지 않습니다.` @@ -833,10 +1054,10 @@ export class BatchService { // 1:n 매핑 경고 (같은 FROM에서 여러 TO로 매핑) const fromMappings = new Map(); - + mappings.forEach((mapping, index) => { - const fromKey = `${mapping.from_connection_type}:${mapping.from_connection_id || 'internal'}:${mapping.from_table_name}:${mapping.from_column_name}`; - + const fromKey = `${mapping.from_connection_type}:${mapping.from_connection_id || "internal"}:${mapping.from_table_name}:${mapping.from_column_name}`; + if (!fromMappings.has(fromKey)) { fromMappings.set(fromKey, []); } @@ -845,7 +1066,7 @@ export class BatchService { fromMappings.forEach((indices, fromKey) => { if (indices.length > 1) { - const [, , tableName, columnName] = fromKey.split(':'); + const [, , tableName, columnName] = fromKey.split(":"); warnings.push( `FROM 컬럼 '${tableName}.${columnName}'에서 ${indices.length}개의 TO 컬럼으로 매핑됩니다. (1:n 매핑)` );