From 1be8771e01ad0c96f167551c013b1700392902c0 Mon Sep 17 00:00:00 2001 From: hjjeong Date: Mon, 29 Sep 2025 16:55:37 +0900 Subject: [PATCH] =?UTF-8?q?rest=20api=20get=20=ED=8C=8C=EB=9D=BC=EB=AF=B8?= =?UTF-8?q?=ED=84=B0=20=EC=84=A4=EC=A0=95=20=EA=B0=9C=EB=B0=9C=EC=A4=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/controllers/batchController.ts | 8 +- .../controllers/batchManagementController.ts | 240 ++++-------------- backend-node/src/database/RestApiConnector.ts | 2 +- .../src/services/batchSchedulerService.ts | 66 ++++- backend-node/src/services/batchService.ts | 74 ++++-- 5 files changed, 162 insertions(+), 228 deletions(-) diff --git a/backend-node/src/controllers/batchController.ts b/backend-node/src/controllers/batchController.ts index f91b5d25..0858cc37 100644 --- a/backend-node/src/controllers/batchController.ts +++ b/backend-node/src/controllers/batchController.ts @@ -192,9 +192,9 @@ export class BatchController { mappings } as CreateBatchConfigRequest); - // 생성된 배치가 활성화 상태라면 스케줄러에 등록 + // 생성된 배치가 활성화 상태라면 스케줄러에 등록 (즉시 실행 비활성화) if (batchConfig.data && batchConfig.data.is_active === 'Y' && batchConfig.data.id) { - await BatchSchedulerService.updateBatchSchedule(batchConfig.data.id); + await BatchSchedulerService.updateBatchSchedule(batchConfig.data.id, false); } return res.status(201).json({ @@ -242,8 +242,8 @@ export class BatchController { }); } - // 스케줄러에서 배치 스케줄 업데이트 (활성화 시 즉시 스케줄 등록) - await BatchSchedulerService.updateBatchSchedule(Number(id)); + // 스케줄러에서 배치 스케줄 업데이트 (즉시 실행 비활성화) + await BatchSchedulerService.updateBatchSchedule(Number(id), false); return res.json({ success: true, diff --git a/backend-node/src/controllers/batchManagementController.ts b/backend-node/src/controllers/batchManagementController.ts index cbff6bc3..71640577 100644 --- a/backend-node/src/controllers/batchManagementController.ts +++ b/backend-node/src/controllers/batchManagementController.ts @@ -224,18 +224,15 @@ export class BatchManagementController { } const batchConfig = batchConfigResult.data as BatchConfig; - - // 배치 실행 로직 (간단한 버전) const startTime = new Date(); - let totalRecords = 0; - let successRecords = 0; - let failedRecords = 0; + console.log(`배치 수동 실행 시작: ${batchConfig.batch_name} (ID: ${id})`); + + let executionLog: any = null; + try { - console.log(`배치 실행 시작: ${batchConfig.batch_name} (ID: ${id})`); - // 실행 로그 생성 - const executionLog = await BatchService.createExecutionLog({ + executionLog = await BatchService.createExecutionLog({ batch_config_id: Number(id), execution_status: 'RUNNING', start_time: startTime, @@ -244,205 +241,74 @@ export class BatchManagementController { failed_records: 0 }); - // 실제 배치 실행 (매핑이 있는 경우) - if (batchConfig.batch_mappings && batchConfig.batch_mappings.length > 0) { - // 테이블별로 매핑을 그룹화 - const tableGroups = new Map(); - - for (const mapping of batchConfig.batch_mappings) { - const key = `${mapping.from_connection_type}:${mapping.from_connection_id || 'internal'}:${mapping.from_table_name}`; - if (!tableGroups.has(key)) { - tableGroups.set(key, []); - } - tableGroups.get(key)!.push(mapping); - } + // BatchSchedulerService의 executeBatchConfig 메서드 사용 (중복 로직 제거) + const { BatchSchedulerService } = await import('../services/batchSchedulerService'); + const result = await BatchSchedulerService.executeBatchConfig(batchConfig); - // 각 테이블 그룹별로 처리 - for (const [tableKey, mappings] of tableGroups) { - try { - const firstMapping = mappings[0]; - console.log(`테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑`); - - let fromData: any[] = []; - - // FROM 데이터 조회 (DB 또는 REST API) - if (firstMapping.from_connection_type === 'restapi') { - // REST API에서 데이터 조회 - console.log(`REST API에서 데이터 조회: ${firstMapping.from_api_url}${firstMapping.from_table_name}`); - console.log(`API 설정:`, { - url: firstMapping.from_api_url, - key: firstMapping.from_api_key ? '***' : 'null', - method: firstMapping.from_api_method, - endpoint: firstMapping.from_table_name - }); - - try { - const apiResult = await BatchExternalDbService.getDataFromRestApi( - firstMapping.from_api_url!, - firstMapping.from_api_key!, - firstMapping.from_table_name, - firstMapping.from_api_method as 'GET' | 'POST' | 'PUT' | 'DELETE' || 'GET', - mappings.map(m => m.from_column_name), - 100, // limit - // 파라미터 정보 전달 - firstMapping.from_api_param_type, - firstMapping.from_api_param_name, - firstMapping.from_api_param_value, - firstMapping.from_api_param_source - ); - - console.log(`API 조회 결과:`, { - success: apiResult.success, - dataCount: apiResult.data ? apiResult.data.length : 0, - message: apiResult.message - }); - - if (apiResult.success && apiResult.data) { - fromData = apiResult.data; - } else { - throw new Error(`REST API 데이터 조회 실패: ${apiResult.message}`); - } - } catch (error) { - console.error(`REST API 조회 오류:`, error); - throw error; - } - } else { - // DB에서 데이터 조회 - const fromColumns = mappings.map(m => m.from_column_name); - fromData = await BatchService.getDataFromTableWithColumns( - firstMapping.from_table_name, - fromColumns, - firstMapping.from_connection_type as 'internal' | 'external', - firstMapping.from_connection_id || undefined - ); - } - - totalRecords += fromData.length; - - // 컬럼 매핑 적용하여 TO 테이블 형식으로 변환 - const mappedData = fromData.map(row => { - const mappedRow: any = {}; - for (const mapping of mappings) { - // DB → REST API 배치인지 확인 - if (firstMapping.to_connection_type === 'restapi' && mapping.to_api_body) { - // DB → REST API: 원본 컬럼명을 키로 사용 (템플릿 처리용) - mappedRow[mapping.from_column_name] = row[mapping.from_column_name]; - } else { - // 기존 로직: to_column_name을 키로 사용 - mappedRow[mapping.to_column_name] = row[mapping.from_column_name]; - } - } - return mappedRow; - }); - - // TO 테이블에 데이터 삽입 (DB 또는 REST API) - let insertResult: { successCount: number; failedCount: number }; - - if (firstMapping.to_connection_type === 'restapi') { - // REST API로 데이터 전송 - console.log(`REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}`); - - // DB → REST API 배치인지 확인 (to_api_body가 있으면 템플릿 기반) - const hasTemplate = mappings.some(m => m.to_api_body); - - if (hasTemplate) { - // 템플릿 기반 REST API 전송 (DB → REST API 배치) - const templateBody = firstMapping.to_api_body || '{}'; - console.log(`템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}`); - - // URL 경로 컬럼 찾기 (PUT/DELETE용) - const urlPathColumn = mappings.find(m => m.to_column_name === 'URL_PATH_PARAM')?.from_column_name; - - const apiResult = await BatchExternalDbService.sendDataToRestApiWithTemplate( - firstMapping.to_api_url!, - firstMapping.to_api_key!, - firstMapping.to_table_name, - firstMapping.to_api_method as 'POST' | 'PUT' | 'DELETE' || 'POST', - templateBody, - mappedData, - urlPathColumn - ); - - if (apiResult.success && apiResult.data) { - insertResult = apiResult.data; - } else { - throw new Error(`템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}`); - } - } else { - // 기존 REST API 전송 (REST API → DB 배치) - const apiResult = await BatchExternalDbService.sendDataToRestApi( - firstMapping.to_api_url!, - firstMapping.to_api_key!, - firstMapping.to_table_name, - firstMapping.to_api_method as 'POST' | 'PUT' || 'POST', - mappedData - ); - - if (apiResult.success && apiResult.data) { - insertResult = apiResult.data; - } else { - throw new Error(`REST API 데이터 전송 실패: ${apiResult.message}`); - } - } - } else { - // DB에 데이터 삽입 - insertResult = await BatchService.insertDataToTable( - firstMapping.to_table_name, - mappedData, - firstMapping.to_connection_type as 'internal' | 'external', - firstMapping.to_connection_id || undefined - ); - } - - successRecords += insertResult.successCount; - failedRecords += insertResult.failedCount; - - console.log(`테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`); - } catch (error) { - console.error(`테이블 처리 실패: ${tableKey}`, error); - failedRecords += 1; - } - } - } else { - console.log("매핑이 없어서 데이터 처리를 건너뜁니다."); + // result가 undefined인 경우 처리 + if (!result) { + throw new Error('배치 실행 결과를 받을 수 없습니다.'); } + const endTime = new Date(); + const duration = endTime.getTime() - startTime.getTime(); + // 실행 로그 업데이트 (성공) await BatchService.updateExecutionLog(executionLog.id, { execution_status: 'SUCCESS', - end_time: new Date(), - duration_ms: Date.now() - startTime.getTime(), - total_records: totalRecords, - success_records: successRecords, - failed_records: failedRecords + end_time: endTime, + duration_ms: duration, + total_records: result.totalRecords, + success_records: result.successRecords, + failed_records: result.failedRecords }); return res.json({ success: true, - message: "배치가 성공적으로 실행되었습니다.", data: { - batchId: id, - totalRecords, - successRecords, - failedRecords, - duration: Date.now() - startTime.getTime() - } + batchName: batchConfig.batch_name, + totalRecords: result.totalRecords, + successRecords: result.successRecords, + failedRecords: result.failedRecords, + executionTime: duration + }, + message: "배치가 성공적으로 실행되었습니다." }); - } catch (error) { - console.error(`배치 실행 실패: ${batchConfig.batch_name}`, error); + + } catch (batchError) { + console.error(`배치 실행 실패: ${batchConfig.batch_name}`, batchError); + // 실행 로그 업데이트 (실패) - executionLog가 생성되었을 경우에만 + try { + const endTime = new Date(); + const duration = endTime.getTime() - startTime.getTime(); + + // executionLog가 정의되어 있는지 확인 + if (typeof executionLog !== 'undefined') { + await BatchService.updateExecutionLog(executionLog.id, { + execution_status: 'FAILED', + end_time: endTime, + duration_ms: duration, + error_message: batchError instanceof Error ? batchError.message : "알 수 없는 오류" + }); + } + } catch (logError) { + console.error('실행 로그 업데이트 실패:', logError); + } + return res.status(500).json({ success: false, message: "배치 실행에 실패했습니다.", - error: error instanceof Error ? error.message : "알 수 없는 오류" + error: batchError instanceof Error ? batchError.message : "알 수 없는 오류" }); } + } catch (error) { - console.error("배치 실행 오류:", error); + console.error(`배치 실행 오류 (ID: ${req.params.id}):`, error); return res.status(500).json({ success: false, message: "배치 실행 중 오류가 발생했습니다.", - error: error instanceof Error ? error.message : "알 수 없는 오류" + error: error instanceof Error ? error.message : "Unknown error" }); } } @@ -465,8 +331,8 @@ export class BatchManagementController { const batchConfig = await BatchService.updateBatchConfig(Number(id), updateData); - // 스케줄러에서 배치 스케줄 업데이트 - await BatchSchedulerService.updateBatchSchedule(Number(id)); + // 스케줄러에서 배치 스케줄 업데이트 (즉시 실행 비활성화) + await BatchSchedulerService.updateBatchSchedule(Number(id), false); return res.json({ success: true, diff --git a/backend-node/src/database/RestApiConnector.ts b/backend-node/src/database/RestApiConnector.ts index 98da0eb3..4ce0039e 100644 --- a/backend-node/src/database/RestApiConnector.ts +++ b/backend-node/src/database/RestApiConnector.ts @@ -27,7 +27,7 @@ export class RestApiConnector implements DatabaseConnector { timeout: config.timeout || 30000, headers: { 'Content-Type': 'application/json', - 'X-API-Key': config.apiKey, + 'Authorization': `Bearer ${config.apiKey}`, 'Accept': 'application/json' } }); diff --git a/backend-node/src/services/batchSchedulerService.ts b/backend-node/src/services/batchSchedulerService.ts index 4a46f595..ea2f7f89 100644 --- a/backend-node/src/services/batchSchedulerService.ts +++ b/backend-node/src/services/batchSchedulerService.ts @@ -10,19 +10,18 @@ import { logger } from '../utils/logger'; export class BatchSchedulerService { private static scheduledTasks: Map = new Map(); private static isInitialized = false; + private static executingBatches: Set = new Set(); // 실행 중인 배치 추적 /** * 스케줄러 초기화 */ static async initialize() { - if (this.isInitialized) { - logger.info('배치 스케줄러가 이미 초기화되었습니다.'); - return; - } - try { logger.info('배치 스케줄러 초기화 시작...'); + // 기존 모든 스케줄 정리 (중복 방지) + this.clearAllSchedules(); + // 활성화된 배치 설정들을 로드하여 스케줄 등록 await this.loadActiveBatchConfigs(); @@ -34,6 +33,27 @@ export class BatchSchedulerService { } } + /** + * 모든 스케줄 정리 + */ + private static clearAllSchedules() { + logger.info(`기존 스케줄 ${this.scheduledTasks.size}개 정리 중...`); + + for (const [id, task] of this.scheduledTasks) { + try { + task.stop(); + task.destroy(); + logger.info(`스케줄 정리 완료: ID ${id}`); + } catch (error) { + logger.error(`스케줄 정리 실패: ID ${id}`, error); + } + } + + this.scheduledTasks.clear(); + this.isInitialized = false; + logger.info('모든 스케줄 정리 완료'); + } + /** * 활성화된 배치 설정들을 로드하여 스케줄 등록 */ @@ -80,8 +100,23 @@ export class BatchSchedulerService { // 새로운 스케줄 등록 const task = cron.schedule(cron_schedule, async () => { + // 중복 실행 방지 체크 + if (this.executingBatches.has(id)) { + logger.warn(`⚠️ 배치가 이미 실행 중입니다. 건너뜀: ${batch_name} (ID: ${id})`); + return; + } + logger.info(`🔄 스케줄 배치 실행 시작: ${batch_name} (ID: ${id})`); - await this.executeBatchConfig(config); + + // 실행 중 플래그 설정 + this.executingBatches.add(id); + + try { + await this.executeBatchConfig(config); + } finally { + // 실행 완료 후 플래그 제거 + this.executingBatches.delete(id); + } }); // 스케줄 시작 (기본적으로 시작되지만 명시적으로 호출) @@ -149,7 +184,7 @@ export class BatchSchedulerService { /** * 배치 설정 실행 */ - private static async executeBatchConfig(config: any) { + static async executeBatchConfig(config: any) { const startTime = new Date(); let executionLog: any = null; @@ -168,7 +203,11 @@ export class BatchSchedulerService { if (!executionLogResponse.success || !executionLogResponse.data) { logger.error(`배치 실행 로그 생성 실패: ${config.batch_name}`, executionLogResponse.message); - return; + return { + totalRecords: 0, + successRecords: 0, + failedRecords: 1 + }; } executionLog = executionLogResponse.data; @@ -187,6 +226,10 @@ export class BatchSchedulerService { }); logger.info(`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`); + + // 성공 결과 반환 + return result; + } catch (error) { logger.error(`배치 실행 실패: ${config.batch_name}`, error); @@ -200,6 +243,13 @@ export class BatchSchedulerService { error_details: error instanceof Error ? error.stack : String(error) }); } + + // 실패 시에도 결과 반환 + return { + totalRecords: 0, + successRecords: 0, + failedRecords: 1 + }; } } diff --git a/backend-node/src/services/batchService.ts b/backend-node/src/services/batchService.ts index 4120e47f..80cd9064 100644 --- a/backend-node/src/services/batchService.ts +++ b/backend-node/src/services/batchService.ts @@ -722,38 +722,56 @@ export class BatchService { const updateColumns = columns.filter(col => col !== primaryKeyColumn); const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', '); - // 먼저 해당 레코드가 존재하는지 확인 - const checkQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${primaryKeyColumn} = $1`; - const existsResult = await prisma.$queryRawUnsafe(checkQuery, record[primaryKeyColumn]); - const exists = (existsResult as any)[0]?.count > 0; - - let query: string; - if (exists && updateSet) { - // 기존 레코드가 있으면 UPDATE (값이 다른 경우에만) - const whereConditions = updateColumns.map((col, index) => - `${col} IS DISTINCT FROM $${index + 2}` - ).join(' OR '); + // 트랜잭션 내에서 처리하여 연결 관리 최적화 + const result = await prisma.$transaction(async (tx) => { + // 먼저 해당 레코드가 존재하는지 확인 + const checkQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${primaryKeyColumn} = $1`; + const existsResult = await tx.$queryRawUnsafe(checkQuery, record[primaryKeyColumn]); + const exists = (existsResult as any)[0]?.count > 0; - query = `UPDATE ${tableName} SET ${updateSet.replace(/EXCLUDED\./g, '')} - WHERE ${primaryKeyColumn} = $1 AND (${whereConditions})`; + let operationResult = 'no_change'; - // 파라미터: [primaryKeyValue, ...updateValues] - const updateValues = [record[primaryKeyColumn], ...updateColumns.map(col => record[col])]; - const updateResult = await prisma.$executeRawUnsafe(query, ...updateValues); - - if (updateResult > 0) { - console.log(`[BatchService] 레코드 업데이트: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); + if (exists && updateSet) { + // 기존 레코드가 있으면 UPDATE (값이 다른 경우에만) + const whereConditions = updateColumns.map((col, index) => { + // 날짜/시간 컬럼에 대한 타입 캐스팅 처리 + if (col.toLowerCase().includes('date') || + col.toLowerCase().includes('time') || + col.toLowerCase().includes('created') || + col.toLowerCase().includes('updated') || + col.toLowerCase().includes('reg')) { + return `${col} IS DISTINCT FROM $${index + 2}::timestamp`; + } + return `${col} IS DISTINCT FROM $${index + 2}`; + }).join(' OR '); + + const query = `UPDATE ${tableName} SET ${updateSet.replace(/EXCLUDED\./g, '')} + WHERE ${primaryKeyColumn} = $1 AND (${whereConditions})`; + + // 파라미터: [primaryKeyValue, ...updateValues] + const updateValues = [record[primaryKeyColumn], ...updateColumns.map(col => record[col])]; + const updateResult = await tx.$executeRawUnsafe(query, ...updateValues); + + if (updateResult > 0) { + console.log(`[BatchService] 레코드 업데이트: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); + operationResult = 'updated'; + } else { + console.log(`[BatchService] 레코드 변경사항 없음: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); + operationResult = 'no_change'; + } + } else if (!exists) { + // 새 레코드 삽입 + const query = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})`; + await tx.$executeRawUnsafe(query, ...values); + console.log(`[BatchService] 새 레코드 삽입: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); + operationResult = 'inserted'; } else { - console.log(`[BatchService] 레코드 변경사항 없음: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); + console.log(`[BatchService] 레코드 이미 존재 (변경사항 없음): ${primaryKeyColumn}=${record[primaryKeyColumn]}`); + operationResult = 'no_change'; } - } else if (!exists) { - // 새 레코드 삽입 - query = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})`; - await prisma.$executeRawUnsafe(query, ...values); - console.log(`[BatchService] 새 레코드 삽입: ${primaryKeyColumn}=${record[primaryKeyColumn]}`); - } else { - console.log(`[BatchService] 레코드 이미 존재 (변경사항 없음): ${primaryKeyColumn}=${record[primaryKeyColumn]}`); - } + + return operationResult; + }); successCount++; } catch (error) {