ERP-node/backend-node/src/services/batchSchedulerService.ts

398 lines
13 KiB
TypeScript
Raw Normal View History

// 배치 스케줄러 서비스
// 작성일: 2024-12-24
import * as cron from 'node-cron';
import prisma from '../config/database';
import { BatchService } from './batchService';
import { BatchExecutionLogService } from './batchExecutionLogService';
import { logger } from '../utils/logger';
export class BatchSchedulerService {
private static scheduledTasks: Map<number, cron.ScheduledTask> = new Map();
private static isInitialized = false;
/**
*
*/
static async initialize() {
if (this.isInitialized) {
logger.info('배치 스케줄러가 이미 초기화되었습니다.');
return;
}
try {
logger.info('배치 스케줄러 초기화 시작...');
// 활성화된 배치 설정들을 로드하여 스케줄 등록
await this.loadActiveBatchConfigs();
this.isInitialized = true;
logger.info('배치 스케줄러 초기화 완료');
} catch (error) {
logger.error('배치 스케줄러 초기화 실패:', error);
throw error;
}
}
/**
*
*/
private static async loadActiveBatchConfigs() {
try {
const activeConfigs = await prisma.batch_configs.findMany({
where: {
is_active: 'Y'
},
include: {
batch_mappings: true
}
});
logger.info(`활성화된 배치 설정 ${activeConfigs.length}개 발견`);
for (const config of activeConfigs) {
await this.scheduleBatchConfig(config);
}
} catch (error) {
logger.error('활성화된 배치 설정 로드 실패:', error);
throw error;
}
}
/**
*
*/
static async scheduleBatchConfig(config: any) {
try {
const { id, batch_name, cron_schedule } = config;
// 기존 스케줄이 있다면 제거
if (this.scheduledTasks.has(id)) {
this.scheduledTasks.get(id)?.stop();
this.scheduledTasks.delete(id);
}
// cron 스케줄 유효성 검사
if (!cron.validate(cron_schedule)) {
logger.error(`잘못된 cron 스케줄: ${cron_schedule} (배치 ID: ${id})`);
return;
}
// 새로운 스케줄 등록
const task = cron.schedule(cron_schedule, async () => {
await this.executeBatchConfig(config);
});
this.scheduledTasks.set(id, task);
logger.info(`배치 스케줄 등록 완료: ${batch_name} (ID: ${id}, Schedule: ${cron_schedule})`);
} catch (error) {
logger.error(`배치 스케줄 등록 실패 (ID: ${config.id}):`, error);
}
}
/**
*
*/
static async unscheduleBatchConfig(batchConfigId: number) {
try {
if (this.scheduledTasks.has(batchConfigId)) {
this.scheduledTasks.get(batchConfigId)?.stop();
this.scheduledTasks.delete(batchConfigId);
logger.info(`배치 스케줄 제거 완료 (ID: ${batchConfigId})`);
}
} catch (error) {
logger.error(`배치 스케줄 제거 실패 (ID: ${batchConfigId}):`, error);
}
}
/**
*
*/
static async updateBatchSchedule(configId: number) {
try {
// 기존 스케줄 제거
await this.unscheduleBatchConfig(configId);
// 업데이트된 배치 설정 조회
const config = await prisma.batch_configs.findUnique({
where: { id: configId },
include: { batch_mappings: true }
});
if (!config) {
logger.warn(`배치 설정을 찾을 수 없습니다: ID ${configId}`);
return;
}
// 활성화된 배치만 다시 스케줄 등록
if (config.is_active === 'Y') {
await this.scheduleBatchConfig(config);
logger.info(`배치 스케줄 업데이트 완료: ${config.batch_name} (ID: ${configId})`);
} else {
logger.info(`비활성화된 배치 스케줄 제거: ${config.batch_name} (ID: ${configId})`);
}
} catch (error) {
logger.error(`배치 스케줄 업데이트 실패: ID ${configId}`, error);
}
}
/**
*
*/
private static async executeBatchConfig(config: any) {
const startTime = new Date();
let executionLog: any = null;
try {
logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id})`);
// 실행 로그 생성
const executionLogResponse = await BatchExecutionLogService.createExecutionLog({
batch_config_id: config.id,
execution_status: 'RUNNING',
start_time: startTime,
total_records: 0,
success_records: 0,
failed_records: 0
});
if (!executionLogResponse.success || !executionLogResponse.data) {
logger.error(`배치 실행 로그 생성 실패: ${config.batch_name}`, executionLogResponse.message);
return;
}
executionLog = executionLogResponse.data;
// 실제 배치 실행 로직 (수동 실행과 동일한 로직 사용)
const result = await this.executeBatchMappings(config);
// 실행 로그 업데이트 (성공)
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: 'SUCCESS',
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
total_records: result.totalRecords,
success_records: result.successRecords,
failed_records: result.failedRecords
});
logger.info(`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`);
} catch (error) {
logger.error(`배치 실행 실패: ${config.batch_name}`, error);
// 실행 로그 업데이트 (실패)
if (executionLog) {
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: 'FAILED',
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
error_message: error instanceof Error ? error.message : '알 수 없는 오류',
error_details: error instanceof Error ? error.stack : String(error)
});
}
}
}
/**
* ( )
*/
private static async executeBatchMappings(config: any) {
let totalRecords = 0;
let successRecords = 0;
let failedRecords = 0;
if (!config.batch_mappings || config.batch_mappings.length === 0) {
logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`);
return { totalRecords, successRecords, failedRecords };
}
// 테이블별로 매핑을 그룹화
const tableGroups = new Map<string, typeof config.batch_mappings>();
for (const mapping of config.batch_mappings) {
const key = `${mapping.from_connection_type}:${mapping.from_connection_id || 'internal'}:${mapping.from_table_name}`;
if (!tableGroups.has(key)) {
tableGroups.set(key, []);
}
tableGroups.get(key)!.push(mapping);
}
// 각 테이블 그룹별로 처리
for (const [tableKey, mappings] of tableGroups) {
try {
const firstMapping = mappings[0];
logger.info(`테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑`);
// FROM 테이블에서 매핑된 컬럼들만 조회
const fromColumns = mappings.map((m: any) => m.from_column_name);
const fromData = await BatchService.getDataFromTableWithColumns(
firstMapping.from_table_name,
fromColumns,
firstMapping.from_connection_type as 'internal' | 'external',
firstMapping.from_connection_id || undefined
);
totalRecords += fromData.length;
// 컬럼 매핑 적용하여 TO 테이블 형식으로 변환
const mappedData = fromData.map(row => {
const mappedRow: any = {};
for (const mapping of mappings) {
mappedRow[mapping.to_column_name] = row[mapping.from_column_name];
}
return mappedRow;
});
// TO 테이블에 데이터 삽입
const insertResult = await BatchService.insertDataToTable(
firstMapping.to_table_name,
mappedData,
firstMapping.to_connection_type as 'internal' | 'external',
firstMapping.to_connection_id || undefined
);
successRecords += insertResult.successCount;
failedRecords += insertResult.failedCount;
logger.info(`테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`);
} catch (error) {
logger.error(`테이블 처리 실패: ${tableKey}`, error);
failedRecords += 1;
}
}
return { totalRecords, successRecords, failedRecords };
}
/**
* ( - )
*/
private static async processBatchMappings(config: any) {
const { batch_mappings } = config;
let totalRecords = 0;
let successRecords = 0;
let failedRecords = 0;
if (!batch_mappings || batch_mappings.length === 0) {
logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`);
return { totalRecords, successRecords, failedRecords };
}
for (const mapping of batch_mappings) {
try {
logger.info(`매핑 처리 시작: ${mapping.from_table_name} -> ${mapping.to_table_name}`);
// FROM 테이블에서 데이터 조회
const fromData = await this.getDataFromSource(mapping);
totalRecords += fromData.length;
// TO 테이블에 데이터 삽입
const insertResult = await this.insertDataToTarget(mapping, fromData);
successRecords += insertResult.successCount;
failedRecords += insertResult.failedCount;
logger.info(`매핑 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`);
} catch (error) {
logger.error(`매핑 처리 실패: ${mapping.from_table_name} -> ${mapping.to_table_name}`, error);
failedRecords += 1;
}
}
return { totalRecords, successRecords, failedRecords };
}
/**
* FROM
*/
private static async getDataFromSource(mapping: any) {
try {
if (mapping.from_connection_type === 'internal') {
// 내부 DB에서 조회
const result = await prisma.$queryRawUnsafe(
`SELECT * FROM ${mapping.from_table_name}`
);
return result as any[];
} else {
// 외부 DB에서 조회 (구현 필요)
logger.warn('외부 DB 조회는 아직 구현되지 않았습니다.');
return [];
}
} catch (error) {
logger.error(`FROM 테이블 데이터 조회 실패: ${mapping.from_table_name}`, error);
throw error;
}
}
/**
* TO
*/
private static async insertDataToTarget(mapping: any, data: any[]) {
let successCount = 0;
let failedCount = 0;
try {
if (mapping.to_connection_type === 'internal') {
// 내부 DB에 삽입
for (const record of data) {
try {
// 매핑된 컬럼만 추출
const mappedData = this.mapColumns(record, mapping);
await prisma.$executeRawUnsafe(
`INSERT INTO ${mapping.to_table_name} (${Object.keys(mappedData).join(', ')}) VALUES (${Object.values(mappedData).map(() => '?').join(', ')})`,
...Object.values(mappedData)
);
successCount++;
} catch (error) {
logger.error(`레코드 삽입 실패:`, error);
failedCount++;
}
}
} else {
// 외부 DB에 삽입 (구현 필요)
logger.warn('외부 DB 삽입은 아직 구현되지 않았습니다.');
failedCount = data.length;
}
} catch (error) {
logger.error(`TO 테이블 데이터 삽입 실패: ${mapping.to_table_name}`, error);
throw error;
}
return { successCount, failedCount };
}
/**
*
*/
private static mapColumns(record: any, mapping: any) {
const mappedData: any = {};
// 단순한 컬럼 매핑 (실제로는 더 복잡한 로직 필요)
mappedData[mapping.to_column_name] = record[mapping.from_column_name];
return mappedData;
}
/**
*
*/
static async stopAllSchedules() {
try {
for (const [id, task] of this.scheduledTasks) {
task.stop();
logger.info(`배치 스케줄 중지: ID ${id}`);
}
this.scheduledTasks.clear();
this.isInitialized = false;
logger.info('모든 배치 스케줄이 중지되었습니다.');
} catch (error) {
logger.error('배치 스케줄 중지 실패:', error);
}
}
/**
*
*/
static getScheduledTasks() {
return Array.from(this.scheduledTasks.keys());
}
}