From 13a9521977825946de2f5c24b9d2527301c519bd Mon Sep 17 00:00:00 2001 From: kjs Date: Wed, 1 Oct 2025 13:34:56 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20=EB=B0=B0=EC=B9=98=20=EC=8A=A4=EC=BC=80?= =?UTF-8?q?=EC=A4=84=EB=9F=AC=20=EC=BB=AC=EB=9F=BC=EB=AA=85=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit batch_mappings 테이블의 실제 컬럼명으로 수정: - field_name → from_column_name, to_column_name - source_field → from_table_name, to_table_name - 전체 컬럼 구조를 실제 DB 스키마에 맞게 수정 수정된 함수: - loadActiveBatchConfigs() - updateBatchSchedule() 에러 해결: column bm.field_name does not exist --- .../src/services/batchSchedulerService.ts | 338 +++++++++++------- 1 file changed, 215 insertions(+), 123 deletions(-) diff --git a/backend-node/src/services/batchSchedulerService.ts b/backend-node/src/services/batchSchedulerService.ts index 21166f93..77863904 100644 --- a/backend-node/src/services/batchSchedulerService.ts +++ b/backend-node/src/services/batchSchedulerService.ts @@ -1,11 +1,11 @@ // 배치 스케줄러 서비스 // 작성일: 2024-12-24 -import * as cron from 'node-cron'; -import { query, queryOne } from '../database/db'; -import { BatchService } from './batchService'; -import { BatchExecutionLogService } from './batchExecutionLogService'; -import { logger } from '../utils/logger'; +import * as cron from "node-cron"; +import { query, queryOne } from "../database/db"; +import { BatchService } from "./batchService"; +import { BatchExecutionLogService } from "./batchExecutionLogService"; +import { logger } from "../utils/logger"; export class BatchSchedulerService { private static scheduledTasks: Map = new Map(); @@ -17,18 +17,18 @@ export class BatchSchedulerService { */ static async initialize() { try { - logger.info('배치 스케줄러 초기화 시작...'); - + logger.info("배치 스케줄러 초기화 시작..."); + // 기존 모든 스케줄 정리 (중복 방지) this.clearAllSchedules(); - + // 활성화된 배치 설정들을 로드하여 스케줄 등록 await this.loadActiveBatchConfigs(); - + this.isInitialized = true; - logger.info('배치 스케줄러 초기화 완료'); + logger.info("배치 스케줄러 초기화 완료"); } catch (error) { - logger.error('배치 스케줄러 초기화 실패:', error); + logger.error("배치 스케줄러 초기화 실패:", error); throw error; } } @@ -38,7 +38,7 @@ export class BatchSchedulerService { */ private static clearAllSchedules() { logger.info(`기존 스케줄 ${this.scheduledTasks.size}개 정리 중...`); - + for (const [id, task] of this.scheduledTasks) { try { task.stop(); @@ -48,10 +48,10 @@ export class BatchSchedulerService { logger.error(`스케줄 정리 실패: ID ${id}`, error); } } - + this.scheduledTasks.clear(); this.isInitialized = false; - logger.info('모든 스케줄 정리 완료'); + logger.info("모든 스케줄 정리 완료"); } /** @@ -66,13 +66,28 @@ export class BatchSchedulerService { json_build_object( 'id', bm.id, 'batch_config_id', bm.batch_config_id, - 'field_name', bm.field_name, - 'source_field', bm.source_field, - 'field_type', bm.field_type, - 'is_required', bm.is_required, - 'default_value', bm.default_value, - 'transform_function', bm.transform_function, - 'sort_order', bm.sort_order + 'from_connection_type', bm.from_connection_type, + 'from_connection_id', bm.from_connection_id, + 'from_table_name', bm.from_table_name, + 'from_column_name', bm.from_column_name, + 'from_column_type', bm.from_column_type, + 'to_connection_type', bm.to_connection_type, + 'to_connection_id', bm.to_connection_id, + 'to_table_name', bm.to_table_name, + 'to_column_name', bm.to_column_name, + 'to_column_type', bm.to_column_type, + 'mapping_order', bm.mapping_order, + 'from_api_url', bm.from_api_url, + 'from_api_key', bm.from_api_key, + 'from_api_method', bm.from_api_method, + 'from_api_param_type', bm.from_api_param_type, + 'from_api_param_name', bm.from_api_param_name, + 'from_api_param_value', bm.from_api_param_value, + 'from_api_param_source', bm.from_api_param_source, + 'to_api_url', bm.to_api_url, + 'to_api_key', bm.to_api_key, + 'to_api_method', bm.to_api_method, + 'to_api_body', bm.to_api_body ) ) FILTER (WHERE bm.id IS NOT NULL) as batch_mappings FROM batch_configs bc @@ -88,7 +103,7 @@ export class BatchSchedulerService { await this.scheduleBatchConfig(config); } } catch (error) { - logger.error('활성화된 배치 설정 로드 실패:', error); + logger.error("활성화된 배치 설정 로드 실패:", error); throw error; } } @@ -116,15 +131,17 @@ export class BatchSchedulerService { const task = cron.schedule(cron_schedule, async () => { // 중복 실행 방지 체크 if (this.executingBatches.has(id)) { - logger.warn(`⚠️ 배치가 이미 실행 중입니다. 건너뜀: ${batch_name} (ID: ${id})`); + logger.warn( + `⚠️ 배치가 이미 실행 중입니다. 건너뜀: ${batch_name} (ID: ${id})` + ); return; } - + logger.info(`🔄 스케줄 배치 실행 시작: ${batch_name} (ID: ${id})`); - + // 실행 중 플래그 설정 this.executingBatches.add(id); - + try { await this.executeBatchConfig(config); } finally { @@ -135,9 +152,11 @@ export class BatchSchedulerService { // 스케줄 시작 (기본적으로 시작되지만 명시적으로 호출) task.start(); - + this.scheduledTasks.set(id, task); - logger.info(`배치 스케줄 등록 완료: ${batch_name} (ID: ${id}, Schedule: ${cron_schedule}) - 스케줄 시작됨`); + logger.info( + `배치 스케줄 등록 완료: ${batch_name} (ID: ${id}, Schedule: ${cron_schedule}) - 스케줄 시작됨` + ); } catch (error) { logger.error(`배치 스케줄 등록 실패 (ID: ${config.id}):`, error); } @@ -161,7 +180,10 @@ export class BatchSchedulerService { /** * 배치 설정 업데이트 시 스케줄 재등록 */ - static async updateBatchSchedule(configId: number, executeImmediately: boolean = true) { + static async updateBatchSchedule( + configId: number, + executeImmediately: boolean = true + ) { try { // 기존 스케줄 제거 await this.unscheduleBatchConfig(configId); @@ -174,13 +196,28 @@ export class BatchSchedulerService { json_build_object( 'id', bm.id, 'batch_config_id', bm.batch_config_id, - 'field_name', bm.field_name, - 'source_field', bm.source_field, - 'field_type', bm.field_type, - 'is_required', bm.is_required, - 'default_value', bm.default_value, - 'transform_function', bm.transform_function, - 'sort_order', bm.sort_order + 'from_connection_type', bm.from_connection_type, + 'from_connection_id', bm.from_connection_id, + 'from_table_name', bm.from_table_name, + 'from_column_name', bm.from_column_name, + 'from_column_type', bm.from_column_type, + 'to_connection_type', bm.to_connection_type, + 'to_connection_id', bm.to_connection_id, + 'to_table_name', bm.to_table_name, + 'to_column_name', bm.to_column_name, + 'to_column_type', bm.to_column_type, + 'mapping_order', bm.mapping_order, + 'from_api_url', bm.from_api_url, + 'from_api_key', bm.from_api_key, + 'from_api_method', bm.from_api_method, + 'from_api_param_type', bm.from_api_param_type, + 'from_api_param_name', bm.from_api_param_name, + 'from_api_param_value', bm.from_api_param_value, + 'from_api_param_source', bm.from_api_param_source, + 'to_api_url', bm.to_api_url, + 'to_api_key', bm.to_api_key, + 'to_api_method', bm.to_api_method, + 'to_api_body', bm.to_api_body ) ) FILTER (WHERE bm.id IS NOT NULL) as batch_mappings FROM batch_configs bc @@ -189,7 +226,7 @@ export class BatchSchedulerService { GROUP BY bc.id`, [configId] ); - + const config = configResult[0] || null; if (!config) { @@ -198,17 +235,23 @@ export class BatchSchedulerService { } // 활성화된 배치만 다시 스케줄 등록 - if (config.is_active === 'Y') { + if (config.is_active === "Y") { await this.scheduleBatchConfig(config); - logger.info(`배치 스케줄 업데이트 완료: ${config.batch_name} (ID: ${configId})`); - + logger.info( + `배치 스케줄 업데이트 완료: ${config.batch_name} (ID: ${configId})` + ); + // 활성화 시 즉시 실행 (옵션) if (executeImmediately) { - logger.info(`🚀 배치 활성화 즉시 실행: ${config.batch_name} (ID: ${configId})`); + logger.info( + `🚀 배치 활성화 즉시 실행: ${config.batch_name} (ID: ${configId})` + ); await this.executeBatchConfig(config); } } else { - logger.info(`비활성화된 배치 스케줄 제거: ${config.batch_name} (ID: ${configId})`); + logger.info( + `비활성화된 배치 스케줄 제거: ${config.batch_name} (ID: ${configId})` + ); } } catch (error) { logger.error(`배치 스케줄 업데이트 실패: ID ${configId}`, error); @@ -226,21 +269,25 @@ export class BatchSchedulerService { logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id})`); // 실행 로그 생성 - const executionLogResponse = await BatchExecutionLogService.createExecutionLog({ - batch_config_id: config.id, - execution_status: 'RUNNING', - start_time: startTime, - total_records: 0, - success_records: 0, - failed_records: 0 - }); + const executionLogResponse = + await BatchExecutionLogService.createExecutionLog({ + batch_config_id: config.id, + execution_status: "RUNNING", + start_time: startTime, + total_records: 0, + success_records: 0, + failed_records: 0, + }); if (!executionLogResponse.success || !executionLogResponse.data) { - logger.error(`배치 실행 로그 생성 실패: ${config.batch_name}`, executionLogResponse.message); + logger.error( + `배치 실행 로그 생성 실패: ${config.batch_name}`, + executionLogResponse.message + ); return { totalRecords: 0, successRecords: 0, - failedRecords: 1 + failedRecords: 1, }; } @@ -251,38 +298,40 @@ export class BatchSchedulerService { // 실행 로그 업데이트 (성공) await BatchExecutionLogService.updateExecutionLog(executionLog.id, { - execution_status: 'SUCCESS', + execution_status: "SUCCESS", end_time: new Date(), duration_ms: Date.now() - startTime.getTime(), total_records: result.totalRecords, success_records: result.successRecords, - failed_records: result.failedRecords + failed_records: result.failedRecords, }); - logger.info(`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`); - + logger.info( + `배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})` + ); + // 성공 결과 반환 return result; - } catch (error) { logger.error(`배치 실행 실패: ${config.batch_name}`, error); // 실행 로그 업데이트 (실패) if (executionLog) { await BatchExecutionLogService.updateExecutionLog(executionLog.id, { - execution_status: 'FAILED', + execution_status: "FAILED", end_time: new Date(), duration_ms: Date.now() - startTime.getTime(), - error_message: error instanceof Error ? error.message : '알 수 없는 오류', - error_details: error instanceof Error ? error.stack : String(error) + error_message: + error instanceof Error ? error.message : "알 수 없는 오류", + error_details: error instanceof Error ? error.stack : String(error), }); } - + // 실패 시에도 결과 반환 return { totalRecords: 0, successRecords: 0, - failedRecords: 1 + failedRecords: 1, }; } } @@ -302,9 +351,9 @@ export class BatchSchedulerService { // 테이블별로 매핑을 그룹화 const tableGroups = new Map(); - + for (const mapping of config.batch_mappings) { - const key = `${mapping.from_connection_type}:${mapping.from_connection_id || 'internal'}:${mapping.from_table_name}`; + const key = `${mapping.from_connection_type}:${mapping.from_connection_id || "internal"}:${mapping.from_table_name}`; if (!tableGroups.has(key)) { tableGroups.set(key, []); } @@ -315,20 +364,30 @@ export class BatchSchedulerService { for (const [tableKey, mappings] of tableGroups) { try { const firstMapping = mappings[0]; - logger.info(`테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑`); - + logger.info( + `테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑` + ); + let fromData: any[] = []; - + // FROM 데이터 조회 (DB 또는 REST API) - if (firstMapping.from_connection_type === 'restapi') { + if (firstMapping.from_connection_type === "restapi") { // REST API에서 데이터 조회 - logger.info(`REST API에서 데이터 조회: ${firstMapping.from_api_url}${firstMapping.from_table_name}`); - const { BatchExternalDbService } = await import('./batchExternalDbService'); + logger.info( + `REST API에서 데이터 조회: ${firstMapping.from_api_url}${firstMapping.from_table_name}` + ); + const { BatchExternalDbService } = await import( + "./batchExternalDbService" + ); const apiResult = await BatchExternalDbService.getDataFromRestApi( firstMapping.from_api_url!, firstMapping.from_api_key!, firstMapping.from_table_name, - firstMapping.from_api_method as 'GET' | 'POST' | 'PUT' | 'DELETE' || 'GET', + (firstMapping.from_api_method as + | "GET" + | "POST" + | "PUT" + | "DELETE") || "GET", mappings.map((m: any) => m.from_column_name), 100, // limit // 파라미터 정보 전달 @@ -337,7 +396,7 @@ export class BatchSchedulerService { firstMapping.from_api_param_value, firstMapping.from_api_param_source ); - + if (apiResult.success && apiResult.data) { fromData = apiResult.data; } else { @@ -349,21 +408,25 @@ export class BatchSchedulerService { fromData = await BatchService.getDataFromTableWithColumns( firstMapping.from_table_name, fromColumns, - firstMapping.from_connection_type as 'internal' | 'external', + firstMapping.from_connection_type as "internal" | "external", firstMapping.from_connection_id || undefined ); } - + totalRecords += fromData.length; // 컬럼 매핑 적용하여 TO 테이블 형식으로 변환 - const mappedData = fromData.map(row => { + const mappedData = fromData.map((row) => { const mappedRow: any = {}; for (const mapping of mappings) { // DB → REST API 배치인지 확인 - if (firstMapping.to_connection_type === 'restapi' && mapping.to_api_body) { + if ( + firstMapping.to_connection_type === "restapi" && + mapping.to_api_body + ) { // DB → REST API: 원본 컬럼명을 키로 사용 (템플릿 처리용) - mappedRow[mapping.from_column_name] = row[mapping.from_column_name]; + mappedRow[mapping.from_column_name] = + row[mapping.from_column_name]; } else { // 기존 로직: to_column_name을 키로 사용 mappedRow[mapping.to_column_name] = row[mapping.from_column_name]; @@ -374,37 +437,49 @@ export class BatchSchedulerService { // TO 테이블에 데이터 삽입 (DB 또는 REST API) let insertResult: { successCount: number; failedCount: number }; - - if (firstMapping.to_connection_type === 'restapi') { + + if (firstMapping.to_connection_type === "restapi") { // REST API로 데이터 전송 - logger.info(`REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}`); - const { BatchExternalDbService } = await import('./batchExternalDbService'); - + logger.info( + `REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}` + ); + const { BatchExternalDbService } = await import( + "./batchExternalDbService" + ); + // DB → REST API 배치인지 확인 (to_api_body가 있으면 템플릿 기반) const hasTemplate = mappings.some((m: any) => m.to_api_body); - + if (hasTemplate) { // 템플릿 기반 REST API 전송 (DB → REST API 배치) - const templateBody = firstMapping.to_api_body || '{}'; - logger.info(`템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}`); - - // URL 경로 컬럼 찾기 (PUT/DELETE용) - const urlPathColumn = mappings.find((m: any) => m.to_column_name === 'URL_PATH_PARAM')?.from_column_name; - - const apiResult = await BatchExternalDbService.sendDataToRestApiWithTemplate( - firstMapping.to_api_url!, - firstMapping.to_api_key!, - firstMapping.to_table_name, - firstMapping.to_api_method as 'POST' | 'PUT' | 'DELETE' || 'POST', - templateBody, - mappedData, - urlPathColumn - ); - + const templateBody = firstMapping.to_api_body || "{}"; + logger.info( + `템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}` + ); + + // URL 경로 컬럼 찾기 (PUT/DELETE용) + const urlPathColumn = mappings.find( + (m: any) => m.to_column_name === "URL_PATH_PARAM" + )?.from_column_name; + + const apiResult = + await BatchExternalDbService.sendDataToRestApiWithTemplate( + firstMapping.to_api_url!, + firstMapping.to_api_key!, + firstMapping.to_table_name, + (firstMapping.to_api_method as "POST" | "PUT" | "DELETE") || + "POST", + templateBody, + mappedData, + urlPathColumn + ); + if (apiResult.success && apiResult.data) { insertResult = apiResult.data; } else { - throw new Error(`템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}`); + throw new Error( + `템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}` + ); } } else { // 기존 REST API 전송 (REST API → DB 배치) @@ -412,14 +487,16 @@ export class BatchSchedulerService { firstMapping.to_api_url!, firstMapping.to_api_key!, firstMapping.to_table_name, - firstMapping.to_api_method as 'POST' | 'PUT' || 'POST', + (firstMapping.to_api_method as "POST" | "PUT") || "POST", mappedData ); - + if (apiResult.success && apiResult.data) { insertResult = apiResult.data; } else { - throw new Error(`REST API 데이터 전송 실패: ${apiResult.message}`); + throw new Error( + `REST API 데이터 전송 실패: ${apiResult.message}` + ); } } } else { @@ -427,15 +504,17 @@ export class BatchSchedulerService { insertResult = await BatchService.insertDataToTable( firstMapping.to_table_name, mappedData, - firstMapping.to_connection_type as 'internal' | 'external', + firstMapping.to_connection_type as "internal" | "external", firstMapping.to_connection_id || undefined ); } - + successRecords += insertResult.successCount; failedRecords += insertResult.failedCount; - logger.info(`테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`); + logger.info( + `테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패` + ); } catch (error) { logger.error(`테이블 처리 실패: ${tableKey}`, error); failedRecords += 1; @@ -461,7 +540,9 @@ export class BatchSchedulerService { for (const mapping of batch_mappings) { try { - logger.info(`매핑 처리 시작: ${mapping.from_table_name} -> ${mapping.to_table_name}`); + logger.info( + `매핑 처리 시작: ${mapping.from_table_name} -> ${mapping.to_table_name}` + ); // FROM 테이블에서 데이터 조회 const fromData = await this.getDataFromSource(mapping); @@ -472,9 +553,14 @@ export class BatchSchedulerService { successRecords += insertResult.successCount; failedRecords += insertResult.failedCount; - logger.info(`매핑 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`); + logger.info( + `매핑 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패` + ); } catch (error) { - logger.error(`매핑 처리 실패: ${mapping.from_table_name} -> ${mapping.to_table_name}`, error); + logger.error( + `매핑 처리 실패: ${mapping.from_table_name} -> ${mapping.to_table_name}`, + error + ); failedRecords += 1; } } @@ -487,7 +573,7 @@ export class BatchSchedulerService { */ private static async getDataFromSource(mapping: any) { try { - if (mapping.from_connection_type === 'internal') { + if (mapping.from_connection_type === "internal") { // 내부 DB에서 조회 const result = await query( `SELECT * FROM ${mapping.from_table_name}`, @@ -496,11 +582,14 @@ export class BatchSchedulerService { return result; } else { // 외부 DB에서 조회 (구현 필요) - logger.warn('외부 DB 조회는 아직 구현되지 않았습니다.'); + logger.warn("외부 DB 조회는 아직 구현되지 않았습니다."); return []; } } catch (error) { - logger.error(`FROM 테이블 데이터 조회 실패: ${mapping.from_table_name}`, error); + logger.error( + `FROM 테이블 데이터 조회 실패: ${mapping.from_table_name}`, + error + ); throw error; } } @@ -513,19 +602,19 @@ export class BatchSchedulerService { let failedCount = 0; try { - if (mapping.to_connection_type === 'internal') { + if (mapping.to_connection_type === "internal") { // 내부 DB에 삽입 for (const record of data) { try { // 매핑된 컬럼만 추출 const mappedData = this.mapColumns(record, mapping); - + const columns = Object.keys(mappedData); const values = Object.values(mappedData); - const placeholders = values.map((_, i) => `$${i + 1}`).join(', '); - + const placeholders = values.map((_, i) => `$${i + 1}`).join(", "); + await query( - `INSERT INTO ${mapping.to_table_name} (${columns.join(', ')}) VALUES (${placeholders})`, + `INSERT INTO ${mapping.to_table_name} (${columns.join(", ")}) VALUES (${placeholders})`, values ); successCount++; @@ -536,11 +625,14 @@ export class BatchSchedulerService { } } else { // 외부 DB에 삽입 (구현 필요) - logger.warn('외부 DB 삽입은 아직 구현되지 않았습니다.'); + logger.warn("외부 DB 삽입은 아직 구현되지 않았습니다."); failedCount = data.length; } } catch (error) { - logger.error(`TO 테이블 데이터 삽입 실패: ${mapping.to_table_name}`, error); + logger.error( + `TO 테이블 데이터 삽입 실패: ${mapping.to_table_name}`, + error + ); throw error; } @@ -552,10 +644,10 @@ export class BatchSchedulerService { */ private static mapColumns(record: any, mapping: any) { const mappedData: any = {}; - + // 단순한 컬럼 매핑 (실제로는 더 복잡한 로직 필요) mappedData[mapping.to_column_name] = record[mapping.from_column_name]; - + return mappedData; } @@ -570,9 +662,9 @@ export class BatchSchedulerService { } this.scheduledTasks.clear(); this.isInitialized = false; - logger.info('모든 배치 스케줄이 중지되었습니다.'); + logger.info("모든 배치 스케줄이 중지되었습니다."); } catch (error) { - logger.error('배치 스케줄 중지 실패:', error); + logger.error("배치 스케줄 중지 실패:", error); } }