// 배치 스케줄러 서비스 // 작성일: 2024-12-24 import * as cron from 'node-cron'; import prisma from '../config/database'; import { BatchService } from './batchService'; import { BatchExecutionLogService } from './batchExecutionLogService'; import { logger } from '../utils/logger'; export class BatchSchedulerService { private static scheduledTasks: Map = new Map(); private static isInitialized = false; /** * 스케줄러 초기화 */ static async initialize() { if (this.isInitialized) { logger.info('배치 스케줄러가 이미 초기화되었습니다.'); return; } try { logger.info('배치 스케줄러 초기화 시작...'); // 활성화된 배치 설정들을 로드하여 스케줄 등록 await this.loadActiveBatchConfigs(); this.isInitialized = true; logger.info('배치 스케줄러 초기화 완료'); } catch (error) { logger.error('배치 스케줄러 초기화 실패:', error); throw error; } } /** * 활성화된 배치 설정들을 로드하여 스케줄 등록 */ private static async loadActiveBatchConfigs() { try { const activeConfigs = await prisma.batch_configs.findMany({ where: { is_active: 'Y' }, include: { batch_mappings: true } }); logger.info(`활성화된 배치 설정 ${activeConfigs.length}개 발견`); for (const config of activeConfigs) { await this.scheduleBatchConfig(config); } } catch (error) { logger.error('활성화된 배치 설정 로드 실패:', error); throw error; } } /** * 배치 설정을 스케줄에 등록 */ static async scheduleBatchConfig(config: any) { try { const { id, batch_name, cron_schedule } = config; // 기존 스케줄이 있다면 제거 if (this.scheduledTasks.has(id)) { this.scheduledTasks.get(id)?.stop(); this.scheduledTasks.delete(id); } // cron 스케줄 유효성 검사 if (!cron.validate(cron_schedule)) { logger.error(`잘못된 cron 스케줄: ${cron_schedule} (배치 ID: ${id})`); return; } // 새로운 스케줄 등록 const task = cron.schedule(cron_schedule, async () => { logger.info(`🔄 스케줄 배치 실행 시작: ${batch_name} (ID: ${id})`); await this.executeBatchConfig(config); }); // 스케줄 시작 (기본적으로 시작되지만 명시적으로 호출) task.start(); this.scheduledTasks.set(id, task); logger.info(`배치 스케줄 등록 완료: ${batch_name} (ID: ${id}, Schedule: ${cron_schedule}) - 스케줄 시작됨`); } catch (error) { logger.error(`배치 스케줄 등록 실패 (ID: ${config.id}):`, error); } } /** * 배치 설정 스케줄 제거 */ static async unscheduleBatchConfig(batchConfigId: number) { try { if (this.scheduledTasks.has(batchConfigId)) { this.scheduledTasks.get(batchConfigId)?.stop(); this.scheduledTasks.delete(batchConfigId); logger.info(`배치 스케줄 제거 완료 (ID: ${batchConfigId})`); } } catch (error) { logger.error(`배치 스케줄 제거 실패 (ID: ${batchConfigId}):`, error); } } /** * 배치 설정 업데이트 시 스케줄 재등록 */ static async updateBatchSchedule(configId: number, executeImmediately: boolean = true) { try { // 기존 스케줄 제거 await this.unscheduleBatchConfig(configId); // 업데이트된 배치 설정 조회 const config = await prisma.batch_configs.findUnique({ where: { id: configId }, include: { batch_mappings: true } }); if (!config) { logger.warn(`배치 설정을 찾을 수 없습니다: ID ${configId}`); return; } // 활성화된 배치만 다시 스케줄 등록 if (config.is_active === 'Y') { await this.scheduleBatchConfig(config); logger.info(`배치 스케줄 업데이트 완료: ${config.batch_name} (ID: ${configId})`); // 활성화 시 즉시 실행 (옵션) if (executeImmediately) { logger.info(`🚀 배치 활성화 즉시 실행: ${config.batch_name} (ID: ${configId})`); await this.executeBatchConfig(config); } } else { logger.info(`비활성화된 배치 스케줄 제거: ${config.batch_name} (ID: ${configId})`); } } catch (error) { logger.error(`배치 스케줄 업데이트 실패: ID ${configId}`, error); } } /** * 배치 설정 실행 */ private static async executeBatchConfig(config: any) { const startTime = new Date(); let executionLog: any = null; try { logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id})`); // 실행 로그 생성 const executionLogResponse = await BatchExecutionLogService.createExecutionLog({ batch_config_id: config.id, execution_status: 'RUNNING', start_time: startTime, total_records: 0, success_records: 0, failed_records: 0 }); if (!executionLogResponse.success || !executionLogResponse.data) { logger.error(`배치 실행 로그 생성 실패: ${config.batch_name}`, executionLogResponse.message); return; } executionLog = executionLogResponse.data; // 실제 배치 실행 로직 (수동 실행과 동일한 로직 사용) const result = await this.executeBatchMappings(config); // 실행 로그 업데이트 (성공) await BatchExecutionLogService.updateExecutionLog(executionLog.id, { execution_status: 'SUCCESS', end_time: new Date(), duration_ms: Date.now() - startTime.getTime(), total_records: result.totalRecords, success_records: result.successRecords, failed_records: result.failedRecords }); logger.info(`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`); } catch (error) { logger.error(`배치 실행 실패: ${config.batch_name}`, error); // 실행 로그 업데이트 (실패) if (executionLog) { await BatchExecutionLogService.updateExecutionLog(executionLog.id, { execution_status: 'FAILED', end_time: new Date(), duration_ms: Date.now() - startTime.getTime(), error_message: error instanceof Error ? error.message : '알 수 없는 오류', error_details: error instanceof Error ? error.stack : String(error) }); } } } /** * 배치 매핑 실행 (수동 실행과 동일한 로직) */ private static async executeBatchMappings(config: any) { let totalRecords = 0; let successRecords = 0; let failedRecords = 0; if (!config.batch_mappings || config.batch_mappings.length === 0) { logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`); return { totalRecords, successRecords, failedRecords }; } // 테이블별로 매핑을 그룹화 const tableGroups = new Map(); for (const mapping of config.batch_mappings) { const key = `${mapping.from_connection_type}:${mapping.from_connection_id || 'internal'}:${mapping.from_table_name}`; if (!tableGroups.has(key)) { tableGroups.set(key, []); } tableGroups.get(key)!.push(mapping); } // 각 테이블 그룹별로 처리 for (const [tableKey, mappings] of tableGroups) { try { const firstMapping = mappings[0]; logger.info(`테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑`); let fromData: any[] = []; // FROM 데이터 조회 (DB 또는 REST API) if (firstMapping.from_connection_type === 'restapi') { // REST API에서 데이터 조회 logger.info(`REST API에서 데이터 조회: ${firstMapping.from_api_url}${firstMapping.from_table_name}`); const { BatchExternalDbService } = await import('./batchExternalDbService'); const apiResult = await BatchExternalDbService.getDataFromRestApi( firstMapping.from_api_url!, firstMapping.from_api_key!, firstMapping.from_table_name, firstMapping.from_api_method as 'GET' | 'POST' | 'PUT' | 'DELETE' || 'GET', mappings.map((m: any) => m.from_column_name), 100, // limit // 파라미터 정보 전달 firstMapping.from_api_param_type, firstMapping.from_api_param_name, firstMapping.from_api_param_value, firstMapping.from_api_param_source ); if (apiResult.success && apiResult.data) { fromData = apiResult.data; } else { throw new Error(`REST API 데이터 조회 실패: ${apiResult.message}`); } } else { // DB에서 데이터 조회 const fromColumns = mappings.map((m: any) => m.from_column_name); fromData = await BatchService.getDataFromTableWithColumns( firstMapping.from_table_name, fromColumns, firstMapping.from_connection_type as 'internal' | 'external', firstMapping.from_connection_id || undefined ); } totalRecords += fromData.length; // 컬럼 매핑 적용하여 TO 테이블 형식으로 변환 const mappedData = fromData.map(row => { const mappedRow: any = {}; for (const mapping of mappings) { // DB → REST API 배치인지 확인 if (firstMapping.to_connection_type === 'restapi' && mapping.to_api_body) { // DB → REST API: 원본 컬럼명을 키로 사용 (템플릿 처리용) mappedRow[mapping.from_column_name] = row[mapping.from_column_name]; } else { // 기존 로직: to_column_name을 키로 사용 mappedRow[mapping.to_column_name] = row[mapping.from_column_name]; } } return mappedRow; }); // TO 테이블에 데이터 삽입 (DB 또는 REST API) let insertResult: { successCount: number; failedCount: number }; if (firstMapping.to_connection_type === 'restapi') { // REST API로 데이터 전송 logger.info(`REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}`); const { BatchExternalDbService } = await import('./batchExternalDbService'); // DB → REST API 배치인지 확인 (to_api_body가 있으면 템플릿 기반) const hasTemplate = mappings.some((m: any) => m.to_api_body); if (hasTemplate) { // 템플릿 기반 REST API 전송 (DB → REST API 배치) const templateBody = firstMapping.to_api_body || '{}'; logger.info(`템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}`); // URL 경로 컬럼 찾기 (PUT/DELETE용) const urlPathColumn = mappings.find((m: any) => m.to_column_name === 'URL_PATH_PARAM')?.from_column_name; const apiResult = await BatchExternalDbService.sendDataToRestApiWithTemplate( firstMapping.to_api_url!, firstMapping.to_api_key!, firstMapping.to_table_name, firstMapping.to_api_method as 'POST' | 'PUT' | 'DELETE' || 'POST', templateBody, mappedData, urlPathColumn ); if (apiResult.success && apiResult.data) { insertResult = apiResult.data; } else { throw new Error(`템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}`); } } else { // 기존 REST API 전송 (REST API → DB 배치) const apiResult = await BatchExternalDbService.sendDataToRestApi( firstMapping.to_api_url!, firstMapping.to_api_key!, firstMapping.to_table_name, firstMapping.to_api_method as 'POST' | 'PUT' || 'POST', mappedData ); if (apiResult.success && apiResult.data) { insertResult = apiResult.data; } else { throw new Error(`REST API 데이터 전송 실패: ${apiResult.message}`); } } } else { // DB에 데이터 삽입 insertResult = await BatchService.insertDataToTable( firstMapping.to_table_name, mappedData, firstMapping.to_connection_type as 'internal' | 'external', firstMapping.to_connection_id || undefined ); } successRecords += insertResult.successCount; failedRecords += insertResult.failedCount; logger.info(`테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`); } catch (error) { logger.error(`테이블 처리 실패: ${tableKey}`, error); failedRecords += 1; } } return { totalRecords, successRecords, failedRecords }; } /** * 배치 매핑 처리 (기존 메서드 - 사용 안 함) */ private static async processBatchMappings(config: any) { const { batch_mappings } = config; let totalRecords = 0; let successRecords = 0; let failedRecords = 0; if (!batch_mappings || batch_mappings.length === 0) { logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`); return { totalRecords, successRecords, failedRecords }; } for (const mapping of batch_mappings) { try { logger.info(`매핑 처리 시작: ${mapping.from_table_name} -> ${mapping.to_table_name}`); // FROM 테이블에서 데이터 조회 const fromData = await this.getDataFromSource(mapping); totalRecords += fromData.length; // TO 테이블에 데이터 삽입 const insertResult = await this.insertDataToTarget(mapping, fromData); successRecords += insertResult.successCount; failedRecords += insertResult.failedCount; logger.info(`매핑 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`); } catch (error) { logger.error(`매핑 처리 실패: ${mapping.from_table_name} -> ${mapping.to_table_name}`, error); failedRecords += 1; } } return { totalRecords, successRecords, failedRecords }; } /** * FROM 테이블에서 데이터 조회 */ private static async getDataFromSource(mapping: any) { try { if (mapping.from_connection_type === 'internal') { // 내부 DB에서 조회 const result = await prisma.$queryRawUnsafe( `SELECT * FROM ${mapping.from_table_name}` ); return result as any[]; } else { // 외부 DB에서 조회 (구현 필요) logger.warn('외부 DB 조회는 아직 구현되지 않았습니다.'); return []; } } catch (error) { logger.error(`FROM 테이블 데이터 조회 실패: ${mapping.from_table_name}`, error); throw error; } } /** * TO 테이블에 데이터 삽입 */ private static async insertDataToTarget(mapping: any, data: any[]) { let successCount = 0; let failedCount = 0; try { if (mapping.to_connection_type === 'internal') { // 내부 DB에 삽입 for (const record of data) { try { // 매핑된 컬럼만 추출 const mappedData = this.mapColumns(record, mapping); await prisma.$executeRawUnsafe( `INSERT INTO ${mapping.to_table_name} (${Object.keys(mappedData).join(', ')}) VALUES (${Object.values(mappedData).map(() => '?').join(', ')})`, ...Object.values(mappedData) ); successCount++; } catch (error) { logger.error(`레코드 삽입 실패:`, error); failedCount++; } } } else { // 외부 DB에 삽입 (구현 필요) logger.warn('외부 DB 삽입은 아직 구현되지 않았습니다.'); failedCount = data.length; } } catch (error) { logger.error(`TO 테이블 데이터 삽입 실패: ${mapping.to_table_name}`, error); throw error; } return { successCount, failedCount }; } /** * 컬럼 매핑 */ private static mapColumns(record: any, mapping: any) { const mappedData: any = {}; // 단순한 컬럼 매핑 (실제로는 더 복잡한 로직 필요) mappedData[mapping.to_column_name] = record[mapping.from_column_name]; return mappedData; } /** * 모든 스케줄 중지 */ static async stopAllSchedules() { try { for (const [id, task] of this.scheduledTasks) { task.stop(); logger.info(`배치 스케줄 중지: ID ${id}`); } this.scheduledTasks.clear(); this.isInitialized = false; logger.info('모든 배치 스케줄이 중지되었습니다.'); } catch (error) { logger.error('배치 스케줄 중지 실패:', error); } } /** * 현재 등록된 스케줄 목록 조회 */ static getScheduledTasks() { return Array.from(this.scheduledTasks.keys()); } }