2025-09-25 11:04:16 +09:00
|
|
|
// 배치 스케줄러 서비스
|
|
|
|
|
// 작성일: 2024-12-24
|
|
|
|
|
|
|
|
|
|
import * as cron from 'node-cron';
|
2025-10-01 13:30:20 +09:00
|
|
|
import { query, queryOne } from '../database/db';
|
2025-09-25 11:04:16 +09:00
|
|
|
import { BatchService } from './batchService';
|
|
|
|
|
import { BatchExecutionLogService } from './batchExecutionLogService';
|
|
|
|
|
import { logger } from '../utils/logger';
|
|
|
|
|
|
|
|
|
|
export class BatchSchedulerService {
|
|
|
|
|
private static scheduledTasks: Map<number, cron.ScheduledTask> = new Map();
|
|
|
|
|
private static isInitialized = false;
|
2025-09-29 16:55:37 +09:00
|
|
|
private static executingBatches: Set<number> = new Set(); // 실행 중인 배치 추적
|
2025-09-25 11:04:16 +09:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 스케줄러 초기화
|
|
|
|
|
*/
|
|
|
|
|
static async initialize() {
|
|
|
|
|
try {
|
|
|
|
|
logger.info('배치 스케줄러 초기화 시작...');
|
|
|
|
|
|
2025-09-29 16:55:37 +09:00
|
|
|
// 기존 모든 스케줄 정리 (중복 방지)
|
|
|
|
|
this.clearAllSchedules();
|
|
|
|
|
|
2025-09-25 11:04:16 +09:00
|
|
|
// 활성화된 배치 설정들을 로드하여 스케줄 등록
|
|
|
|
|
await this.loadActiveBatchConfigs();
|
|
|
|
|
|
|
|
|
|
this.isInitialized = true;
|
|
|
|
|
logger.info('배치 스케줄러 초기화 완료');
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error('배치 스케줄러 초기화 실패:', error);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-29 16:55:37 +09:00
|
|
|
/**
|
|
|
|
|
* 모든 스케줄 정리
|
|
|
|
|
*/
|
|
|
|
|
private static clearAllSchedules() {
|
|
|
|
|
logger.info(`기존 스케줄 ${this.scheduledTasks.size}개 정리 중...`);
|
|
|
|
|
|
|
|
|
|
for (const [id, task] of this.scheduledTasks) {
|
|
|
|
|
try {
|
|
|
|
|
task.stop();
|
|
|
|
|
task.destroy();
|
|
|
|
|
logger.info(`스케줄 정리 완료: ID ${id}`);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`스케줄 정리 실패: ID ${id}`, error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.scheduledTasks.clear();
|
|
|
|
|
this.isInitialized = false;
|
|
|
|
|
logger.info('모든 스케줄 정리 완료');
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-25 11:04:16 +09:00
|
|
|
/**
|
|
|
|
|
* 활성화된 배치 설정들을 로드하여 스케줄 등록
|
|
|
|
|
*/
|
|
|
|
|
private static async loadActiveBatchConfigs() {
|
|
|
|
|
try {
|
2025-10-01 13:30:20 +09:00
|
|
|
const activeConfigs = await query<any>(
|
|
|
|
|
`SELECT
|
|
|
|
|
bc.*,
|
|
|
|
|
json_agg(
|
|
|
|
|
json_build_object(
|
|
|
|
|
'id', bm.id,
|
|
|
|
|
'batch_config_id', bm.batch_config_id,
|
|
|
|
|
'field_name', bm.field_name,
|
|
|
|
|
'source_field', bm.source_field,
|
|
|
|
|
'field_type', bm.field_type,
|
|
|
|
|
'is_required', bm.is_required,
|
|
|
|
|
'default_value', bm.default_value,
|
|
|
|
|
'transform_function', bm.transform_function,
|
|
|
|
|
'sort_order', bm.sort_order
|
|
|
|
|
)
|
|
|
|
|
) FILTER (WHERE bm.id IS NOT NULL) as batch_mappings
|
|
|
|
|
FROM batch_configs bc
|
|
|
|
|
LEFT JOIN batch_mappings bm ON bc.id = bm.batch_config_id
|
|
|
|
|
WHERE bc.is_active = 'Y'
|
|
|
|
|
GROUP BY bc.id`,
|
|
|
|
|
[]
|
|
|
|
|
);
|
2025-09-25 11:04:16 +09:00
|
|
|
|
|
|
|
|
logger.info(`활성화된 배치 설정 ${activeConfigs.length}개 발견`);
|
|
|
|
|
|
|
|
|
|
for (const config of activeConfigs) {
|
|
|
|
|
await this.scheduleBatchConfig(config);
|
|
|
|
|
}
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error('활성화된 배치 설정 로드 실패:', error);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 배치 설정을 스케줄에 등록
|
|
|
|
|
*/
|
|
|
|
|
static async scheduleBatchConfig(config: any) {
|
|
|
|
|
try {
|
|
|
|
|
const { id, batch_name, cron_schedule } = config;
|
|
|
|
|
|
|
|
|
|
// 기존 스케줄이 있다면 제거
|
|
|
|
|
if (this.scheduledTasks.has(id)) {
|
|
|
|
|
this.scheduledTasks.get(id)?.stop();
|
|
|
|
|
this.scheduledTasks.delete(id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cron 스케줄 유효성 검사
|
|
|
|
|
if (!cron.validate(cron_schedule)) {
|
|
|
|
|
logger.error(`잘못된 cron 스케줄: ${cron_schedule} (배치 ID: ${id})`);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 새로운 스케줄 등록
|
|
|
|
|
const task = cron.schedule(cron_schedule, async () => {
|
2025-09-29 16:55:37 +09:00
|
|
|
// 중복 실행 방지 체크
|
|
|
|
|
if (this.executingBatches.has(id)) {
|
|
|
|
|
logger.warn(`⚠️ 배치가 이미 실행 중입니다. 건너뜀: ${batch_name} (ID: ${id})`);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-26 17:29:20 +09:00
|
|
|
logger.info(`🔄 스케줄 배치 실행 시작: ${batch_name} (ID: ${id})`);
|
2025-09-29 16:55:37 +09:00
|
|
|
|
|
|
|
|
// 실행 중 플래그 설정
|
|
|
|
|
this.executingBatches.add(id);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
await this.executeBatchConfig(config);
|
|
|
|
|
} finally {
|
|
|
|
|
// 실행 완료 후 플래그 제거
|
|
|
|
|
this.executingBatches.delete(id);
|
|
|
|
|
}
|
2025-09-25 11:04:16 +09:00
|
|
|
});
|
|
|
|
|
|
2025-09-26 17:29:20 +09:00
|
|
|
// 스케줄 시작 (기본적으로 시작되지만 명시적으로 호출)
|
|
|
|
|
task.start();
|
|
|
|
|
|
2025-09-25 11:04:16 +09:00
|
|
|
this.scheduledTasks.set(id, task);
|
2025-09-26 17:29:20 +09:00
|
|
|
logger.info(`배치 스케줄 등록 완료: ${batch_name} (ID: ${id}, Schedule: ${cron_schedule}) - 스케줄 시작됨`);
|
2025-09-25 11:04:16 +09:00
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`배치 스케줄 등록 실패 (ID: ${config.id}):`, error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 배치 설정 스케줄 제거
|
|
|
|
|
*/
|
|
|
|
|
static async unscheduleBatchConfig(batchConfigId: number) {
|
|
|
|
|
try {
|
|
|
|
|
if (this.scheduledTasks.has(batchConfigId)) {
|
|
|
|
|
this.scheduledTasks.get(batchConfigId)?.stop();
|
|
|
|
|
this.scheduledTasks.delete(batchConfigId);
|
|
|
|
|
logger.info(`배치 스케줄 제거 완료 (ID: ${batchConfigId})`);
|
|
|
|
|
}
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`배치 스케줄 제거 실패 (ID: ${batchConfigId}):`, error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 배치 설정 업데이트 시 스케줄 재등록
|
|
|
|
|
*/
|
2025-09-29 13:48:59 +09:00
|
|
|
static async updateBatchSchedule(configId: number, executeImmediately: boolean = true) {
|
2025-09-25 11:04:16 +09:00
|
|
|
try {
|
|
|
|
|
// 기존 스케줄 제거
|
|
|
|
|
await this.unscheduleBatchConfig(configId);
|
|
|
|
|
|
|
|
|
|
// 업데이트된 배치 설정 조회
|
2025-10-01 13:30:20 +09:00
|
|
|
const configResult = await query<any>(
|
|
|
|
|
`SELECT
|
|
|
|
|
bc.*,
|
|
|
|
|
json_agg(
|
|
|
|
|
json_build_object(
|
|
|
|
|
'id', bm.id,
|
|
|
|
|
'batch_config_id', bm.batch_config_id,
|
|
|
|
|
'field_name', bm.field_name,
|
|
|
|
|
'source_field', bm.source_field,
|
|
|
|
|
'field_type', bm.field_type,
|
|
|
|
|
'is_required', bm.is_required,
|
|
|
|
|
'default_value', bm.default_value,
|
|
|
|
|
'transform_function', bm.transform_function,
|
|
|
|
|
'sort_order', bm.sort_order
|
|
|
|
|
)
|
|
|
|
|
) FILTER (WHERE bm.id IS NOT NULL) as batch_mappings
|
|
|
|
|
FROM batch_configs bc
|
|
|
|
|
LEFT JOIN batch_mappings bm ON bc.id = bm.batch_config_id
|
|
|
|
|
WHERE bc.id = $1
|
|
|
|
|
GROUP BY bc.id`,
|
|
|
|
|
[configId]
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
const config = configResult[0] || null;
|
2025-09-25 11:04:16 +09:00
|
|
|
|
|
|
|
|
if (!config) {
|
|
|
|
|
logger.warn(`배치 설정을 찾을 수 없습니다: ID ${configId}`);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 활성화된 배치만 다시 스케줄 등록
|
|
|
|
|
if (config.is_active === 'Y') {
|
|
|
|
|
await this.scheduleBatchConfig(config);
|
|
|
|
|
logger.info(`배치 스케줄 업데이트 완료: ${config.batch_name} (ID: ${configId})`);
|
2025-09-29 13:48:59 +09:00
|
|
|
|
|
|
|
|
// 활성화 시 즉시 실행 (옵션)
|
|
|
|
|
if (executeImmediately) {
|
|
|
|
|
logger.info(`🚀 배치 활성화 즉시 실행: ${config.batch_name} (ID: ${configId})`);
|
|
|
|
|
await this.executeBatchConfig(config);
|
|
|
|
|
}
|
2025-09-25 11:04:16 +09:00
|
|
|
} else {
|
|
|
|
|
logger.info(`비활성화된 배치 스케줄 제거: ${config.batch_name} (ID: ${configId})`);
|
|
|
|
|
}
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`배치 스케줄 업데이트 실패: ID ${configId}`, error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 배치 설정 실행
|
|
|
|
|
*/
|
2025-09-29 16:55:37 +09:00
|
|
|
static async executeBatchConfig(config: any) {
|
2025-09-25 11:04:16 +09:00
|
|
|
const startTime = new Date();
|
|
|
|
|
let executionLog: any = null;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id})`);
|
|
|
|
|
|
|
|
|
|
// 실행 로그 생성
|
|
|
|
|
const executionLogResponse = await BatchExecutionLogService.createExecutionLog({
|
|
|
|
|
batch_config_id: config.id,
|
|
|
|
|
execution_status: 'RUNNING',
|
|
|
|
|
start_time: startTime,
|
|
|
|
|
total_records: 0,
|
|
|
|
|
success_records: 0,
|
|
|
|
|
failed_records: 0
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (!executionLogResponse.success || !executionLogResponse.data) {
|
|
|
|
|
logger.error(`배치 실행 로그 생성 실패: ${config.batch_name}`, executionLogResponse.message);
|
2025-09-29 16:55:37 +09:00
|
|
|
return {
|
|
|
|
|
totalRecords: 0,
|
|
|
|
|
successRecords: 0,
|
|
|
|
|
failedRecords: 1
|
|
|
|
|
};
|
2025-09-25 11:04:16 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
executionLog = executionLogResponse.data;
|
|
|
|
|
|
|
|
|
|
// 실제 배치 실행 로직 (수동 실행과 동일한 로직 사용)
|
|
|
|
|
const result = await this.executeBatchMappings(config);
|
|
|
|
|
|
|
|
|
|
// 실행 로그 업데이트 (성공)
|
|
|
|
|
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
|
|
|
|
|
execution_status: 'SUCCESS',
|
|
|
|
|
end_time: new Date(),
|
|
|
|
|
duration_ms: Date.now() - startTime.getTime(),
|
|
|
|
|
total_records: result.totalRecords,
|
|
|
|
|
success_records: result.successRecords,
|
|
|
|
|
failed_records: result.failedRecords
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
logger.info(`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`);
|
2025-09-29 16:55:37 +09:00
|
|
|
|
|
|
|
|
// 성공 결과 반환
|
|
|
|
|
return result;
|
|
|
|
|
|
2025-09-25 11:04:16 +09:00
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`배치 실행 실패: ${config.batch_name}`, error);
|
|
|
|
|
|
|
|
|
|
// 실행 로그 업데이트 (실패)
|
|
|
|
|
if (executionLog) {
|
|
|
|
|
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
|
|
|
|
|
execution_status: 'FAILED',
|
|
|
|
|
end_time: new Date(),
|
|
|
|
|
duration_ms: Date.now() - startTime.getTime(),
|
|
|
|
|
error_message: error instanceof Error ? error.message : '알 수 없는 오류',
|
|
|
|
|
error_details: error instanceof Error ? error.stack : String(error)
|
|
|
|
|
});
|
|
|
|
|
}
|
2025-09-29 16:55:37 +09:00
|
|
|
|
|
|
|
|
// 실패 시에도 결과 반환
|
|
|
|
|
return {
|
|
|
|
|
totalRecords: 0,
|
|
|
|
|
successRecords: 0,
|
|
|
|
|
failedRecords: 1
|
|
|
|
|
};
|
2025-09-25 11:04:16 +09:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 배치 매핑 실행 (수동 실행과 동일한 로직)
|
|
|
|
|
*/
|
|
|
|
|
private static async executeBatchMappings(config: any) {
|
|
|
|
|
let totalRecords = 0;
|
|
|
|
|
let successRecords = 0;
|
|
|
|
|
let failedRecords = 0;
|
|
|
|
|
|
|
|
|
|
if (!config.batch_mappings || config.batch_mappings.length === 0) {
|
|
|
|
|
logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`);
|
|
|
|
|
return { totalRecords, successRecords, failedRecords };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 테이블별로 매핑을 그룹화
|
|
|
|
|
const tableGroups = new Map<string, typeof config.batch_mappings>();
|
|
|
|
|
|
|
|
|
|
for (const mapping of config.batch_mappings) {
|
|
|
|
|
const key = `${mapping.from_connection_type}:${mapping.from_connection_id || 'internal'}:${mapping.from_table_name}`;
|
|
|
|
|
if (!tableGroups.has(key)) {
|
|
|
|
|
tableGroups.set(key, []);
|
|
|
|
|
}
|
|
|
|
|
tableGroups.get(key)!.push(mapping);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 각 테이블 그룹별로 처리
|
|
|
|
|
for (const [tableKey, mappings] of tableGroups) {
|
|
|
|
|
try {
|
|
|
|
|
const firstMapping = mappings[0];
|
|
|
|
|
logger.info(`테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑`);
|
|
|
|
|
|
2025-09-26 17:29:20 +09:00
|
|
|
let fromData: any[] = [];
|
|
|
|
|
|
|
|
|
|
// FROM 데이터 조회 (DB 또는 REST API)
|
|
|
|
|
if (firstMapping.from_connection_type === 'restapi') {
|
|
|
|
|
// REST API에서 데이터 조회
|
|
|
|
|
logger.info(`REST API에서 데이터 조회: ${firstMapping.from_api_url}${firstMapping.from_table_name}`);
|
|
|
|
|
const { BatchExternalDbService } = await import('./batchExternalDbService');
|
|
|
|
|
const apiResult = await BatchExternalDbService.getDataFromRestApi(
|
|
|
|
|
firstMapping.from_api_url!,
|
|
|
|
|
firstMapping.from_api_key!,
|
|
|
|
|
firstMapping.from_table_name,
|
|
|
|
|
firstMapping.from_api_method as 'GET' | 'POST' | 'PUT' | 'DELETE' || 'GET',
|
2025-09-29 13:48:59 +09:00
|
|
|
mappings.map((m: any) => m.from_column_name),
|
|
|
|
|
100, // limit
|
|
|
|
|
// 파라미터 정보 전달
|
|
|
|
|
firstMapping.from_api_param_type,
|
|
|
|
|
firstMapping.from_api_param_name,
|
|
|
|
|
firstMapping.from_api_param_value,
|
|
|
|
|
firstMapping.from_api_param_source
|
2025-09-26 17:29:20 +09:00
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (apiResult.success && apiResult.data) {
|
|
|
|
|
fromData = apiResult.data;
|
|
|
|
|
} else {
|
|
|
|
|
throw new Error(`REST API 데이터 조회 실패: ${apiResult.message}`);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// DB에서 데이터 조회
|
|
|
|
|
const fromColumns = mappings.map((m: any) => m.from_column_name);
|
|
|
|
|
fromData = await BatchService.getDataFromTableWithColumns(
|
|
|
|
|
firstMapping.from_table_name,
|
|
|
|
|
fromColumns,
|
|
|
|
|
firstMapping.from_connection_type as 'internal' | 'external',
|
|
|
|
|
firstMapping.from_connection_id || undefined
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-25 11:04:16 +09:00
|
|
|
totalRecords += fromData.length;
|
|
|
|
|
|
|
|
|
|
// 컬럼 매핑 적용하여 TO 테이블 형식으로 변환
|
|
|
|
|
const mappedData = fromData.map(row => {
|
|
|
|
|
const mappedRow: any = {};
|
|
|
|
|
for (const mapping of mappings) {
|
2025-09-26 17:29:20 +09:00
|
|
|
// DB → REST API 배치인지 확인
|
|
|
|
|
if (firstMapping.to_connection_type === 'restapi' && mapping.to_api_body) {
|
|
|
|
|
// DB → REST API: 원본 컬럼명을 키로 사용 (템플릿 처리용)
|
|
|
|
|
mappedRow[mapping.from_column_name] = row[mapping.from_column_name];
|
|
|
|
|
} else {
|
|
|
|
|
// 기존 로직: to_column_name을 키로 사용
|
|
|
|
|
mappedRow[mapping.to_column_name] = row[mapping.from_column_name];
|
|
|
|
|
}
|
2025-09-25 11:04:16 +09:00
|
|
|
}
|
|
|
|
|
return mappedRow;
|
|
|
|
|
});
|
|
|
|
|
|
2025-09-26 17:29:20 +09:00
|
|
|
// TO 테이블에 데이터 삽입 (DB 또는 REST API)
|
|
|
|
|
let insertResult: { successCount: number; failedCount: number };
|
|
|
|
|
|
|
|
|
|
if (firstMapping.to_connection_type === 'restapi') {
|
|
|
|
|
// REST API로 데이터 전송
|
|
|
|
|
logger.info(`REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}`);
|
|
|
|
|
const { BatchExternalDbService } = await import('./batchExternalDbService');
|
|
|
|
|
|
|
|
|
|
// DB → REST API 배치인지 확인 (to_api_body가 있으면 템플릿 기반)
|
|
|
|
|
const hasTemplate = mappings.some((m: any) => m.to_api_body);
|
|
|
|
|
|
|
|
|
|
if (hasTemplate) {
|
|
|
|
|
// 템플릿 기반 REST API 전송 (DB → REST API 배치)
|
|
|
|
|
const templateBody = firstMapping.to_api_body || '{}';
|
|
|
|
|
logger.info(`템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}`);
|
|
|
|
|
|
|
|
|
|
// URL 경로 컬럼 찾기 (PUT/DELETE용)
|
|
|
|
|
const urlPathColumn = mappings.find((m: any) => m.to_column_name === 'URL_PATH_PARAM')?.from_column_name;
|
|
|
|
|
|
|
|
|
|
const apiResult = await BatchExternalDbService.sendDataToRestApiWithTemplate(
|
|
|
|
|
firstMapping.to_api_url!,
|
|
|
|
|
firstMapping.to_api_key!,
|
|
|
|
|
firstMapping.to_table_name,
|
|
|
|
|
firstMapping.to_api_method as 'POST' | 'PUT' | 'DELETE' || 'POST',
|
|
|
|
|
templateBody,
|
|
|
|
|
mappedData,
|
|
|
|
|
urlPathColumn
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (apiResult.success && apiResult.data) {
|
|
|
|
|
insertResult = apiResult.data;
|
|
|
|
|
} else {
|
|
|
|
|
throw new Error(`템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}`);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// 기존 REST API 전송 (REST API → DB 배치)
|
|
|
|
|
const apiResult = await BatchExternalDbService.sendDataToRestApi(
|
|
|
|
|
firstMapping.to_api_url!,
|
|
|
|
|
firstMapping.to_api_key!,
|
|
|
|
|
firstMapping.to_table_name,
|
|
|
|
|
firstMapping.to_api_method as 'POST' | 'PUT' || 'POST',
|
|
|
|
|
mappedData
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (apiResult.success && apiResult.data) {
|
|
|
|
|
insertResult = apiResult.data;
|
|
|
|
|
} else {
|
|
|
|
|
throw new Error(`REST API 데이터 전송 실패: ${apiResult.message}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// DB에 데이터 삽입
|
|
|
|
|
insertResult = await BatchService.insertDataToTable(
|
|
|
|
|
firstMapping.to_table_name,
|
|
|
|
|
mappedData,
|
|
|
|
|
firstMapping.to_connection_type as 'internal' | 'external',
|
|
|
|
|
firstMapping.to_connection_id || undefined
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-25 11:04:16 +09:00
|
|
|
successRecords += insertResult.successCount;
|
|
|
|
|
failedRecords += insertResult.failedCount;
|
|
|
|
|
|
|
|
|
|
logger.info(`테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`테이블 처리 실패: ${tableKey}`, error);
|
|
|
|
|
failedRecords += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return { totalRecords, successRecords, failedRecords };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 배치 매핑 처리 (기존 메서드 - 사용 안 함)
|
|
|
|
|
*/
|
|
|
|
|
private static async processBatchMappings(config: any) {
|
|
|
|
|
const { batch_mappings } = config;
|
|
|
|
|
let totalRecords = 0;
|
|
|
|
|
let successRecords = 0;
|
|
|
|
|
let failedRecords = 0;
|
|
|
|
|
|
|
|
|
|
if (!batch_mappings || batch_mappings.length === 0) {
|
|
|
|
|
logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`);
|
|
|
|
|
return { totalRecords, successRecords, failedRecords };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (const mapping of batch_mappings) {
|
|
|
|
|
try {
|
|
|
|
|
logger.info(`매핑 처리 시작: ${mapping.from_table_name} -> ${mapping.to_table_name}`);
|
|
|
|
|
|
|
|
|
|
// FROM 테이블에서 데이터 조회
|
|
|
|
|
const fromData = await this.getDataFromSource(mapping);
|
|
|
|
|
totalRecords += fromData.length;
|
|
|
|
|
|
|
|
|
|
// TO 테이블에 데이터 삽입
|
|
|
|
|
const insertResult = await this.insertDataToTarget(mapping, fromData);
|
|
|
|
|
successRecords += insertResult.successCount;
|
|
|
|
|
failedRecords += insertResult.failedCount;
|
|
|
|
|
|
|
|
|
|
logger.info(`매핑 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`매핑 처리 실패: ${mapping.from_table_name} -> ${mapping.to_table_name}`, error);
|
|
|
|
|
failedRecords += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return { totalRecords, successRecords, failedRecords };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* FROM 테이블에서 데이터 조회
|
|
|
|
|
*/
|
|
|
|
|
private static async getDataFromSource(mapping: any) {
|
|
|
|
|
try {
|
|
|
|
|
if (mapping.from_connection_type === 'internal') {
|
|
|
|
|
// 내부 DB에서 조회
|
2025-10-01 13:30:20 +09:00
|
|
|
const result = await query<any>(
|
|
|
|
|
`SELECT * FROM ${mapping.from_table_name}`,
|
|
|
|
|
[]
|
2025-09-25 11:04:16 +09:00
|
|
|
);
|
2025-10-01 13:30:20 +09:00
|
|
|
return result;
|
2025-09-25 11:04:16 +09:00
|
|
|
} else {
|
|
|
|
|
// 외부 DB에서 조회 (구현 필요)
|
|
|
|
|
logger.warn('외부 DB 조회는 아직 구현되지 않았습니다.');
|
|
|
|
|
return [];
|
|
|
|
|
}
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`FROM 테이블 데이터 조회 실패: ${mapping.from_table_name}`, error);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* TO 테이블에 데이터 삽입
|
|
|
|
|
*/
|
|
|
|
|
private static async insertDataToTarget(mapping: any, data: any[]) {
|
|
|
|
|
let successCount = 0;
|
|
|
|
|
let failedCount = 0;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
if (mapping.to_connection_type === 'internal') {
|
|
|
|
|
// 내부 DB에 삽입
|
|
|
|
|
for (const record of data) {
|
|
|
|
|
try {
|
|
|
|
|
// 매핑된 컬럼만 추출
|
|
|
|
|
const mappedData = this.mapColumns(record, mapping);
|
|
|
|
|
|
2025-10-01 13:30:20 +09:00
|
|
|
const columns = Object.keys(mappedData);
|
|
|
|
|
const values = Object.values(mappedData);
|
|
|
|
|
const placeholders = values.map((_, i) => `$${i + 1}`).join(', ');
|
|
|
|
|
|
|
|
|
|
await query(
|
|
|
|
|
`INSERT INTO ${mapping.to_table_name} (${columns.join(', ')}) VALUES (${placeholders})`,
|
|
|
|
|
values
|
2025-09-25 11:04:16 +09:00
|
|
|
);
|
|
|
|
|
successCount++;
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`레코드 삽입 실패:`, error);
|
|
|
|
|
failedCount++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// 외부 DB에 삽입 (구현 필요)
|
|
|
|
|
logger.warn('외부 DB 삽입은 아직 구현되지 않았습니다.');
|
|
|
|
|
failedCount = data.length;
|
|
|
|
|
}
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error(`TO 테이블 데이터 삽입 실패: ${mapping.to_table_name}`, error);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return { successCount, failedCount };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 컬럼 매핑
|
|
|
|
|
*/
|
|
|
|
|
private static mapColumns(record: any, mapping: any) {
|
|
|
|
|
const mappedData: any = {};
|
|
|
|
|
|
|
|
|
|
// 단순한 컬럼 매핑 (실제로는 더 복잡한 로직 필요)
|
|
|
|
|
mappedData[mapping.to_column_name] = record[mapping.from_column_name];
|
|
|
|
|
|
|
|
|
|
return mappedData;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 모든 스케줄 중지
|
|
|
|
|
*/
|
|
|
|
|
static async stopAllSchedules() {
|
|
|
|
|
try {
|
|
|
|
|
for (const [id, task] of this.scheduledTasks) {
|
|
|
|
|
task.stop();
|
|
|
|
|
logger.info(`배치 스케줄 중지: ID ${id}`);
|
|
|
|
|
}
|
|
|
|
|
this.scheduledTasks.clear();
|
|
|
|
|
this.isInitialized = false;
|
|
|
|
|
logger.info('모든 배치 스케줄이 중지되었습니다.');
|
|
|
|
|
} catch (error) {
|
|
|
|
|
logger.error('배치 스케줄 중지 실패:', error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 현재 등록된 스케줄 목록 조회
|
|
|
|
|
*/
|
|
|
|
|
static getScheduledTasks() {
|
|
|
|
|
return Array.from(this.scheduledTasks.keys());
|
|
|
|
|
}
|
|
|
|
|
}
|