import cron, { ScheduledTask } from "node-cron"; import { BatchService } from "./batchService"; import { BatchExecutionLogService } from "./batchExecutionLogService"; import { logger } from "../utils/logger"; import { query } from "../database/db"; export class BatchSchedulerService { private static scheduledTasks: Map = new Map(); /** * 모든 활성 배치의 스케줄링 초기화 */ static async initializeScheduler() { try { logger.info("배치 스케줄러 초기화 시작"); const batchConfigsResponse = await BatchService.getBatchConfigs({ is_active: "Y", }); if (!batchConfigsResponse.success || !batchConfigsResponse.data) { logger.warn("스케줄링할 활성 배치 설정이 없습니다."); return; } const batchConfigs = batchConfigsResponse.data; logger.info(`${batchConfigs.length}개의 배치 설정 스케줄링 등록`); for (const config of batchConfigs) { await this.scheduleBatch(config); } logger.info("배치 스케줄러 초기화 완료"); } catch (error) { logger.error("배치 스케줄러 초기화 중 오류 발생:", error); } } /** * 개별 배치 작업 스케줄링 */ static async scheduleBatch(config: any) { try { // 기존 스케줄이 있으면 제거 if (this.scheduledTasks.has(config.id)) { this.scheduledTasks.get(config.id)?.stop(); this.scheduledTasks.delete(config.id); } if (config.is_active !== "Y") { logger.info( `배치 스케줄링 건너뜀 (비활성 상태): ${config.batch_name} (ID: ${config.id})` ); return; } if (!cron.validate(config.cron_schedule)) { logger.error( `유효하지 않은 Cron 표현식: ${config.cron_schedule} (Batch ID: ${config.id})` ); return; } logger.info( `배치 스케줄 등록: ${config.batch_name} (ID: ${config.id}, Cron: ${config.cron_schedule})` ); const task = cron.schedule(config.cron_schedule, async () => { logger.info( `스케줄에 의한 배치 실행 시작: ${config.batch_name} (ID: ${config.id})` ); await this.executeBatchConfig(config); }); this.scheduledTasks.set(config.id, task); } catch (error) { logger.error(`배치 스케줄링 중 오류 발생 (ID: ${config.id}):`, error); } } /** * 배치 스케줄 업데이트 (설정 변경 시 호출) */ static async updateBatchSchedule( configId: number, executeImmediately: boolean = true ) { try { const result = await BatchService.getBatchConfigById(configId); if (!result.success || !result.data) { // 설정이 없으면 스케줄 제거 if (this.scheduledTasks.has(configId)) { this.scheduledTasks.get(configId)?.stop(); this.scheduledTasks.delete(configId); } return; } const config = result.data; // 스케줄 재등록 await this.scheduleBatch(config); // 즉시 실행 옵션이 있으면 실행 /* if (executeImmediately && config.is_active === "Y") { logger.info(`배치 설정 변경 후 즉시 실행: ${config.batch_name}`); this.executeBatchConfig(config).catch((err) => logger.error(`즉시 실행 중 오류 발생:`, err) ); } */ } catch (error) { logger.error(`배치 스케줄 업데이트 실패: ID ${configId}`, error); } } /** * 배치 설정 실행 */ static async executeBatchConfig(config: any) { const startTime = new Date(); let executionLog: any = null; try { logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id})`); // 매핑 정보가 없으면 상세 조회로 다시 가져오기 if (!config.batch_mappings || config.batch_mappings.length === 0) { const fullConfig = await BatchService.getBatchConfigById(config.id); if (fullConfig.success && fullConfig.data) { config = fullConfig.data; } } // 실행 로그 생성 const executionLogResponse = await BatchExecutionLogService.createExecutionLog({ batch_config_id: config.id, company_code: config.company_code, execution_status: "RUNNING", start_time: startTime, total_records: 0, success_records: 0, failed_records: 0, }); if (!executionLogResponse.success || !executionLogResponse.data) { logger.error( `배치 실행 로그 생성 실패: ${config.batch_name}`, executionLogResponse.message ); return { totalRecords: 0, successRecords: 0, failedRecords: 1, }; } executionLog = executionLogResponse.data; // 실제 배치 실행 로직 (수동 실행과 동일한 로직 사용) const result = await this.executeBatchMappings(config); // 실행 로그 업데이트 (성공) await BatchExecutionLogService.updateExecutionLog(executionLog.id, { execution_status: "SUCCESS", end_time: new Date(), duration_ms: Date.now() - startTime.getTime(), total_records: result.totalRecords, success_records: result.successRecords, failed_records: result.failedRecords, }); logger.info( `배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})` ); // 성공 결과 반환 return result; } catch (error) { logger.error(`배치 실행 중 오류 발생: ${config.batch_name}`, error); // 실행 로그 업데이트 (실패) if (executionLog) { await BatchExecutionLogService.updateExecutionLog(executionLog.id, { execution_status: "FAILED", end_time: new Date(), duration_ms: Date.now() - startTime.getTime(), error_message: error instanceof Error ? error.message : "알 수 없는 오류", }); } // 실패 결과 반환 return { totalRecords: 0, successRecords: 0, failedRecords: 1, }; } } /** * 배치 매핑 실행 (수동 실행과 동일한 로직) */ private static async executeBatchMappings(config: any) { let totalRecords = 0; let successRecords = 0; let failedRecords = 0; if (!config.batch_mappings || config.batch_mappings.length === 0) { logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`); return { totalRecords, successRecords, failedRecords }; } // 테이블별로 매핑을 그룹화 // 고정값 매핑(mapping_type === 'fixed')은 별도 그룹으로 분리하지 않고 나중에 처리 const tableGroups = new Map(); const fixedMappingsGlobal: typeof config.batch_mappings = []; for (const mapping of config.batch_mappings) { // 고정값 매핑은 별도로 모아둠 (FROM 소스가 필요 없음) if (mapping.mapping_type === "fixed") { fixedMappingsGlobal.push(mapping); continue; } const key = `${mapping.from_connection_type}:${mapping.from_connection_id || "internal"}:${mapping.from_table_name}`; if (!tableGroups.has(key)) { tableGroups.set(key, []); } tableGroups.get(key)!.push(mapping); } // 고정값 매핑만 있고 일반 매핑이 없는 경우 처리 if (tableGroups.size === 0 && fixedMappingsGlobal.length > 0) { logger.warn( `일반 매핑이 없고 고정값 매핑만 있습니다. 고정값만으로는 배치를 실행할 수 없습니다.` ); return { totalRecords, successRecords, failedRecords }; } // 각 테이블 그룹별로 처리 for (const [tableKey, mappings] of tableGroups) { try { const firstMapping = mappings[0]; logger.info( `테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑` ); let fromData: any[] = []; // FROM 데이터 조회 (DB 또는 REST API) if (firstMapping.from_connection_type === "restapi") { // REST API에서 데이터 조회 logger.info( `REST API에서 데이터 조회: ${firstMapping.from_api_url}${firstMapping.from_table_name}` ); const { BatchExternalDbService } = await import( "./batchExternalDbService" ); // auth_service_name이 설정된 경우 auth_tokens에서 토큰 조회 (멀티테넌시 적용) let apiKey = firstMapping.from_api_key || ""; if (config.auth_service_name) { let tokenQuery: string; let tokenParams: any[]; if (config.company_code === "*") { // 최고 관리자 배치: 모든 회사 토큰 조회 가능 tokenQuery = `SELECT access_token FROM auth_tokens WHERE service_name = $1 ORDER BY created_date DESC LIMIT 1`; tokenParams = [config.auth_service_name]; } else { // 일반 회사 배치: 자신의 회사 토큰만 조회 tokenQuery = `SELECT access_token FROM auth_tokens WHERE service_name = $1 AND company_code = $2 ORDER BY created_date DESC LIMIT 1`; tokenParams = [config.auth_service_name, config.company_code]; } const tokenResult = await query<{ access_token: string }>( tokenQuery, tokenParams ); if (tokenResult.length > 0 && tokenResult[0].access_token) { apiKey = tokenResult[0].access_token; logger.info( `auth_tokens에서 토큰 조회 성공: ${config.auth_service_name}` ); } else { logger.warn( `auth_tokens에서 토큰을 찾을 수 없음: ${config.auth_service_name}` ); } } // 👇 Body 파라미터 추가 (POST 요청 시) const apiResult = await BatchExternalDbService.getDataFromRestApi( firstMapping.from_api_url!, apiKey, firstMapping.from_table_name, (firstMapping.from_api_method as | "GET" | "POST" | "PUT" | "DELETE") || "GET", mappings.map((m: any) => m.from_column_name), 100, // limit // 파라미터 정보 전달 firstMapping.from_api_param_type, firstMapping.from_api_param_name, firstMapping.from_api_param_value, firstMapping.from_api_param_source, // 👇 Body 전달 (FROM - REST API - POST 요청) firstMapping.from_api_body ); if (apiResult.success && apiResult.data) { // 데이터 배열 경로가 설정되어 있으면 해당 경로에서 배열 추출 if (config.data_array_path) { const extractArrayByPath = (obj: any, path: string): any[] => { if (!path) return Array.isArray(obj) ? obj : [obj]; const keys = path.split("."); let current = obj; for (const key of keys) { if (current === null || current === undefined) return []; current = current[key]; } return Array.isArray(current) ? current : current ? [current] : []; }; // apiResult.data가 단일 객체인 경우 (API 응답 전체) const rawData = Array.isArray(apiResult.data) && apiResult.data.length === 1 ? apiResult.data[0] : apiResult.data; fromData = extractArrayByPath(rawData, config.data_array_path); logger.info( `데이터 배열 경로 '${config.data_array_path}'에서 ${fromData.length}개 레코드 추출` ); } else { fromData = apiResult.data; } } else { throw new Error(`REST API 데이터 조회 실패: ${apiResult.message}`); } } else { // DB에서 데이터 조회 const fromColumns = mappings.map((m: any) => m.from_column_name); fromData = await BatchService.getDataFromTableWithColumns( firstMapping.from_table_name, fromColumns, firstMapping.from_connection_type as "internal" | "external", firstMapping.from_connection_id || undefined ); } totalRecords += fromData.length; // 컬럼 매핑 적용하여 TO 테이블 형식으로 변환 // 유틸리티 함수: 점 표기법을 사용하여 중첩된 객체 값 가져오기 const getValueByPath = (obj: any, path: string) => { if (!path) return undefined; // path가 'response.access_token' 처럼 점을 포함하는 경우 if (path.includes(".")) { return path.split(".").reduce((acc, part) => acc && acc[part], obj); } // 단순 키인 경우 return obj[path]; }; const mappedData = fromData.map((row) => { const mappedRow: any = {}; for (const mapping of mappings) { // 고정값 매핑은 이미 분리되어 있으므로 여기서는 처리하지 않음 if (mapping.mapping_type === "fixed") { continue; } // DB → REST API 배치인지 확인 if ( firstMapping.to_connection_type === "restapi" && mapping.to_api_body ) { // DB → REST API: 원본 컬럼명을 키로 사용 (템플릿 처리용) mappedRow[mapping.from_column_name] = row[mapping.from_column_name]; } else { // REST API -> DB (POST 요청 포함) 또는 DB -> DB // row[mapping.from_column_name] 대신 getValueByPath 사용 const value = getValueByPath(row, mapping.from_column_name); mappedRow[mapping.to_column_name] = value; } } // 고정값 매핑 적용 (전역으로 분리된 fixedMappingsGlobal 사용) for (const fixedMapping of fixedMappingsGlobal) { // from_column_name에 고정값이 저장되어 있음 mappedRow[fixedMapping.to_column_name] = fixedMapping.from_column_name; } // 멀티테넌시: TO가 DB일 때 company_code 자동 주입 // - 배치 설정에 company_code가 있고 // - 매핑에서 company_code를 명시적으로 다루지 않은 경우만 if ( firstMapping.to_connection_type !== "restapi" && config.company_code && mappedRow.company_code === undefined ) { mappedRow.company_code = config.company_code; } return mappedRow; }); // TO 테이블에 데이터 삽입 (DB 또는 REST API) let insertResult: { successCount: number; failedCount: number }; if (firstMapping.to_connection_type === "restapi") { // REST API로 데이터 전송 logger.info( `REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}` ); const { BatchExternalDbService } = await import( "./batchExternalDbService" ); // DB → REST API 배치인지 확인 (to_api_body가 있으면 템플릿 기반) const hasTemplate = mappings.some((m: any) => m.to_api_body); if (hasTemplate) { // 템플릿 기반 REST API 전송 (DB → REST API 배치) const templateBody = firstMapping.to_api_body || "{}"; logger.info( `템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}` ); // URL 경로 컬럼 찾기 (PUT/DELETE용) const urlPathColumn = mappings.find( (m: any) => m.to_column_name === "URL_PATH_PARAM" )?.from_column_name; const apiResult = await BatchExternalDbService.sendDataToRestApiWithTemplate( firstMapping.to_api_url!, firstMapping.to_api_key!, firstMapping.to_table_name, (firstMapping.to_api_method as "POST" | "PUT" | "DELETE") || "POST", templateBody, mappedData, urlPathColumn ); if (apiResult.success && apiResult.data) { insertResult = apiResult.data; } else { throw new Error( `템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}` ); } } else { // 기존 REST API 전송 (REST API → DB 배치) - 사실 이 경우는 거의 없음 (REST to REST) // 지원하지 않음 logger.warn( "REST API -> REST API (단순 매핑)은 아직 지원하지 않습니다." ); insertResult = { successCount: 0, failedCount: 0 }; } } else { // DB에 데이터 삽입 (save_mode, conflict_key 지원) insertResult = await BatchService.insertDataToTable( firstMapping.to_table_name, mappedData, firstMapping.to_connection_type as "internal" | "external", firstMapping.to_connection_id || undefined, (config.save_mode as "INSERT" | "UPSERT") || "INSERT", config.conflict_key || undefined ); } successRecords += insertResult.successCount; failedRecords += insertResult.failedCount; } catch (error) { logger.error(`테이블 처리 중 오류 발생: ${tableKey}`, error); // 해당 테이블 처리 실패는 전체 실패로 간주하지 않고, 실패 카운트만 증가? // 여기서는 일단 실패 로그만 남기고 계속 진행 (필요시 정책 변경) } } return { totalRecords, successRecords, failedRecords }; } /** * 개별 배치 작업 스케줄링 (scheduleBatch의 별칭) */ static async scheduleBatchConfig(config: any) { return this.scheduleBatch(config); } }