fix: 배치 스케줄러 컬럼명 수정

batch_mappings 테이블의 실제 컬럼명으로 수정:
- field_name → from_column_name, to_column_name
- source_field → from_table_name, to_table_name
- 전체 컬럼 구조를 실제 DB 스키마에 맞게 수정

수정된 함수:
- loadActiveBatchConfigs()
- updateBatchSchedule()

에러 해결: column bm.field_name does not exist
This commit is contained in:
kjs 2025-10-01 13:34:56 +09:00
parent 505f656c15
commit 13a9521977
1 changed files with 215 additions and 123 deletions

View File

@ -1,11 +1,11 @@
// 배치 스케줄러 서비스
// 작성일: 2024-12-24
import * as cron from 'node-cron';
import { query, queryOne } from '../database/db';
import { BatchService } from './batchService';
import { BatchExecutionLogService } from './batchExecutionLogService';
import { logger } from '../utils/logger';
import * as cron from "node-cron";
import { query, queryOne } from "../database/db";
import { BatchService } from "./batchService";
import { BatchExecutionLogService } from "./batchExecutionLogService";
import { logger } from "../utils/logger";
export class BatchSchedulerService {
private static scheduledTasks: Map<number, cron.ScheduledTask> = new Map();
@ -17,18 +17,18 @@ export class BatchSchedulerService {
*/
static async initialize() {
try {
logger.info('배치 스케줄러 초기화 시작...');
logger.info("배치 스케줄러 초기화 시작...");
// 기존 모든 스케줄 정리 (중복 방지)
this.clearAllSchedules();
// 활성화된 배치 설정들을 로드하여 스케줄 등록
await this.loadActiveBatchConfigs();
this.isInitialized = true;
logger.info('배치 스케줄러 초기화 완료');
logger.info("배치 스케줄러 초기화 완료");
} catch (error) {
logger.error('배치 스케줄러 초기화 실패:', error);
logger.error("배치 스케줄러 초기화 실패:", error);
throw error;
}
}
@ -38,7 +38,7 @@ export class BatchSchedulerService {
*/
private static clearAllSchedules() {
logger.info(`기존 스케줄 ${this.scheduledTasks.size}개 정리 중...`);
for (const [id, task] of this.scheduledTasks) {
try {
task.stop();
@ -48,10 +48,10 @@ export class BatchSchedulerService {
logger.error(`스케줄 정리 실패: ID ${id}`, error);
}
}
this.scheduledTasks.clear();
this.isInitialized = false;
logger.info('모든 스케줄 정리 완료');
logger.info("모든 스케줄 정리 완료");
}
/**
@ -66,13 +66,28 @@ export class BatchSchedulerService {
json_build_object(
'id', bm.id,
'batch_config_id', bm.batch_config_id,
'field_name', bm.field_name,
'source_field', bm.source_field,
'field_type', bm.field_type,
'is_required', bm.is_required,
'default_value', bm.default_value,
'transform_function', bm.transform_function,
'sort_order', bm.sort_order
'from_connection_type', bm.from_connection_type,
'from_connection_id', bm.from_connection_id,
'from_table_name', bm.from_table_name,
'from_column_name', bm.from_column_name,
'from_column_type', bm.from_column_type,
'to_connection_type', bm.to_connection_type,
'to_connection_id', bm.to_connection_id,
'to_table_name', bm.to_table_name,
'to_column_name', bm.to_column_name,
'to_column_type', bm.to_column_type,
'mapping_order', bm.mapping_order,
'from_api_url', bm.from_api_url,
'from_api_key', bm.from_api_key,
'from_api_method', bm.from_api_method,
'from_api_param_type', bm.from_api_param_type,
'from_api_param_name', bm.from_api_param_name,
'from_api_param_value', bm.from_api_param_value,
'from_api_param_source', bm.from_api_param_source,
'to_api_url', bm.to_api_url,
'to_api_key', bm.to_api_key,
'to_api_method', bm.to_api_method,
'to_api_body', bm.to_api_body
)
) FILTER (WHERE bm.id IS NOT NULL) as batch_mappings
FROM batch_configs bc
@ -88,7 +103,7 @@ export class BatchSchedulerService {
await this.scheduleBatchConfig(config);
}
} catch (error) {
logger.error('활성화된 배치 설정 로드 실패:', error);
logger.error("활성화된 배치 설정 로드 실패:", error);
throw error;
}
}
@ -116,15 +131,17 @@ export class BatchSchedulerService {
const task = cron.schedule(cron_schedule, async () => {
// 중복 실행 방지 체크
if (this.executingBatches.has(id)) {
logger.warn(`⚠️ 배치가 이미 실행 중입니다. 건너뜀: ${batch_name} (ID: ${id})`);
logger.warn(
`⚠️ 배치가 이미 실행 중입니다. 건너뜀: ${batch_name} (ID: ${id})`
);
return;
}
logger.info(`🔄 스케줄 배치 실행 시작: ${batch_name} (ID: ${id})`);
// 실행 중 플래그 설정
this.executingBatches.add(id);
try {
await this.executeBatchConfig(config);
} finally {
@ -135,9 +152,11 @@ export class BatchSchedulerService {
// 스케줄 시작 (기본적으로 시작되지만 명시적으로 호출)
task.start();
this.scheduledTasks.set(id, task);
logger.info(`배치 스케줄 등록 완료: ${batch_name} (ID: ${id}, Schedule: ${cron_schedule}) - 스케줄 시작됨`);
logger.info(
`배치 스케줄 등록 완료: ${batch_name} (ID: ${id}, Schedule: ${cron_schedule}) - 스케줄 시작됨`
);
} catch (error) {
logger.error(`배치 스케줄 등록 실패 (ID: ${config.id}):`, error);
}
@ -161,7 +180,10 @@ export class BatchSchedulerService {
/**
*
*/
static async updateBatchSchedule(configId: number, executeImmediately: boolean = true) {
static async updateBatchSchedule(
configId: number,
executeImmediately: boolean = true
) {
try {
// 기존 스케줄 제거
await this.unscheduleBatchConfig(configId);
@ -174,13 +196,28 @@ export class BatchSchedulerService {
json_build_object(
'id', bm.id,
'batch_config_id', bm.batch_config_id,
'field_name', bm.field_name,
'source_field', bm.source_field,
'field_type', bm.field_type,
'is_required', bm.is_required,
'default_value', bm.default_value,
'transform_function', bm.transform_function,
'sort_order', bm.sort_order
'from_connection_type', bm.from_connection_type,
'from_connection_id', bm.from_connection_id,
'from_table_name', bm.from_table_name,
'from_column_name', bm.from_column_name,
'from_column_type', bm.from_column_type,
'to_connection_type', bm.to_connection_type,
'to_connection_id', bm.to_connection_id,
'to_table_name', bm.to_table_name,
'to_column_name', bm.to_column_name,
'to_column_type', bm.to_column_type,
'mapping_order', bm.mapping_order,
'from_api_url', bm.from_api_url,
'from_api_key', bm.from_api_key,
'from_api_method', bm.from_api_method,
'from_api_param_type', bm.from_api_param_type,
'from_api_param_name', bm.from_api_param_name,
'from_api_param_value', bm.from_api_param_value,
'from_api_param_source', bm.from_api_param_source,
'to_api_url', bm.to_api_url,
'to_api_key', bm.to_api_key,
'to_api_method', bm.to_api_method,
'to_api_body', bm.to_api_body
)
) FILTER (WHERE bm.id IS NOT NULL) as batch_mappings
FROM batch_configs bc
@ -189,7 +226,7 @@ export class BatchSchedulerService {
GROUP BY bc.id`,
[configId]
);
const config = configResult[0] || null;
if (!config) {
@ -198,17 +235,23 @@ export class BatchSchedulerService {
}
// 활성화된 배치만 다시 스케줄 등록
if (config.is_active === 'Y') {
if (config.is_active === "Y") {
await this.scheduleBatchConfig(config);
logger.info(`배치 스케줄 업데이트 완료: ${config.batch_name} (ID: ${configId})`);
logger.info(
`배치 스케줄 업데이트 완료: ${config.batch_name} (ID: ${configId})`
);
// 활성화 시 즉시 실행 (옵션)
if (executeImmediately) {
logger.info(`🚀 배치 활성화 즉시 실행: ${config.batch_name} (ID: ${configId})`);
logger.info(
`🚀 배치 활성화 즉시 실행: ${config.batch_name} (ID: ${configId})`
);
await this.executeBatchConfig(config);
}
} else {
logger.info(`비활성화된 배치 스케줄 제거: ${config.batch_name} (ID: ${configId})`);
logger.info(
`비활성화된 배치 스케줄 제거: ${config.batch_name} (ID: ${configId})`
);
}
} catch (error) {
logger.error(`배치 스케줄 업데이트 실패: ID ${configId}`, error);
@ -226,21 +269,25 @@ export class BatchSchedulerService {
logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id})`);
// 실행 로그 생성
const executionLogResponse = await BatchExecutionLogService.createExecutionLog({
batch_config_id: config.id,
execution_status: 'RUNNING',
start_time: startTime,
total_records: 0,
success_records: 0,
failed_records: 0
});
const executionLogResponse =
await BatchExecutionLogService.createExecutionLog({
batch_config_id: config.id,
execution_status: "RUNNING",
start_time: startTime,
total_records: 0,
success_records: 0,
failed_records: 0,
});
if (!executionLogResponse.success || !executionLogResponse.data) {
logger.error(`배치 실행 로그 생성 실패: ${config.batch_name}`, executionLogResponse.message);
logger.error(
`배치 실행 로그 생성 실패: ${config.batch_name}`,
executionLogResponse.message
);
return {
totalRecords: 0,
successRecords: 0,
failedRecords: 1
failedRecords: 1,
};
}
@ -251,38 +298,40 @@ export class BatchSchedulerService {
// 실행 로그 업데이트 (성공)
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: 'SUCCESS',
execution_status: "SUCCESS",
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
total_records: result.totalRecords,
success_records: result.successRecords,
failed_records: result.failedRecords
failed_records: result.failedRecords,
});
logger.info(`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`);
logger.info(
`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`
);
// 성공 결과 반환
return result;
} catch (error) {
logger.error(`배치 실행 실패: ${config.batch_name}`, error);
// 실행 로그 업데이트 (실패)
if (executionLog) {
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: 'FAILED',
execution_status: "FAILED",
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
error_message: error instanceof Error ? error.message : '알 수 없는 오류',
error_details: error instanceof Error ? error.stack : String(error)
error_message:
error instanceof Error ? error.message : "알 수 없는 오류",
error_details: error instanceof Error ? error.stack : String(error),
});
}
// 실패 시에도 결과 반환
return {
totalRecords: 0,
successRecords: 0,
failedRecords: 1
failedRecords: 1,
};
}
}
@ -302,9 +351,9 @@ export class BatchSchedulerService {
// 테이블별로 매핑을 그룹화
const tableGroups = new Map<string, typeof config.batch_mappings>();
for (const mapping of config.batch_mappings) {
const key = `${mapping.from_connection_type}:${mapping.from_connection_id || 'internal'}:${mapping.from_table_name}`;
const key = `${mapping.from_connection_type}:${mapping.from_connection_id || "internal"}:${mapping.from_table_name}`;
if (!tableGroups.has(key)) {
tableGroups.set(key, []);
}
@ -315,20 +364,30 @@ export class BatchSchedulerService {
for (const [tableKey, mappings] of tableGroups) {
try {
const firstMapping = mappings[0];
logger.info(`테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑`);
logger.info(
`테이블 처리 시작: ${tableKey} -> ${mappings.length}개 컬럼 매핑`
);
let fromData: any[] = [];
// FROM 데이터 조회 (DB 또는 REST API)
if (firstMapping.from_connection_type === 'restapi') {
if (firstMapping.from_connection_type === "restapi") {
// REST API에서 데이터 조회
logger.info(`REST API에서 데이터 조회: ${firstMapping.from_api_url}${firstMapping.from_table_name}`);
const { BatchExternalDbService } = await import('./batchExternalDbService');
logger.info(
`REST API에서 데이터 조회: ${firstMapping.from_api_url}${firstMapping.from_table_name}`
);
const { BatchExternalDbService } = await import(
"./batchExternalDbService"
);
const apiResult = await BatchExternalDbService.getDataFromRestApi(
firstMapping.from_api_url!,
firstMapping.from_api_key!,
firstMapping.from_table_name,
firstMapping.from_api_method as 'GET' | 'POST' | 'PUT' | 'DELETE' || 'GET',
(firstMapping.from_api_method as
| "GET"
| "POST"
| "PUT"
| "DELETE") || "GET",
mappings.map((m: any) => m.from_column_name),
100, // limit
// 파라미터 정보 전달
@ -337,7 +396,7 @@ export class BatchSchedulerService {
firstMapping.from_api_param_value,
firstMapping.from_api_param_source
);
if (apiResult.success && apiResult.data) {
fromData = apiResult.data;
} else {
@ -349,21 +408,25 @@ export class BatchSchedulerService {
fromData = await BatchService.getDataFromTableWithColumns(
firstMapping.from_table_name,
fromColumns,
firstMapping.from_connection_type as 'internal' | 'external',
firstMapping.from_connection_type as "internal" | "external",
firstMapping.from_connection_id || undefined
);
}
totalRecords += fromData.length;
// 컬럼 매핑 적용하여 TO 테이블 형식으로 변환
const mappedData = fromData.map(row => {
const mappedData = fromData.map((row) => {
const mappedRow: any = {};
for (const mapping of mappings) {
// DB → REST API 배치인지 확인
if (firstMapping.to_connection_type === 'restapi' && mapping.to_api_body) {
if (
firstMapping.to_connection_type === "restapi" &&
mapping.to_api_body
) {
// DB → REST API: 원본 컬럼명을 키로 사용 (템플릿 처리용)
mappedRow[mapping.from_column_name] = row[mapping.from_column_name];
mappedRow[mapping.from_column_name] =
row[mapping.from_column_name];
} else {
// 기존 로직: to_column_name을 키로 사용
mappedRow[mapping.to_column_name] = row[mapping.from_column_name];
@ -374,37 +437,49 @@ export class BatchSchedulerService {
// TO 테이블에 데이터 삽입 (DB 또는 REST API)
let insertResult: { successCount: number; failedCount: number };
if (firstMapping.to_connection_type === 'restapi') {
if (firstMapping.to_connection_type === "restapi") {
// REST API로 데이터 전송
logger.info(`REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}`);
const { BatchExternalDbService } = await import('./batchExternalDbService');
logger.info(
`REST API로 데이터 전송: ${firstMapping.to_api_url}${firstMapping.to_table_name}`
);
const { BatchExternalDbService } = await import(
"./batchExternalDbService"
);
// DB → REST API 배치인지 확인 (to_api_body가 있으면 템플릿 기반)
const hasTemplate = mappings.some((m: any) => m.to_api_body);
if (hasTemplate) {
// 템플릿 기반 REST API 전송 (DB → REST API 배치)
const templateBody = firstMapping.to_api_body || '{}';
logger.info(`템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}`);
// URL 경로 컬럼 찾기 (PUT/DELETE용)
const urlPathColumn = mappings.find((m: any) => m.to_column_name === 'URL_PATH_PARAM')?.from_column_name;
const apiResult = await BatchExternalDbService.sendDataToRestApiWithTemplate(
firstMapping.to_api_url!,
firstMapping.to_api_key!,
firstMapping.to_table_name,
firstMapping.to_api_method as 'POST' | 'PUT' | 'DELETE' || 'POST',
templateBody,
mappedData,
urlPathColumn
);
const templateBody = firstMapping.to_api_body || "{}";
logger.info(
`템플릿 기반 REST API 전송, Request Body 템플릿: ${templateBody}`
);
// URL 경로 컬럼 찾기 (PUT/DELETE용)
const urlPathColumn = mappings.find(
(m: any) => m.to_column_name === "URL_PATH_PARAM"
)?.from_column_name;
const apiResult =
await BatchExternalDbService.sendDataToRestApiWithTemplate(
firstMapping.to_api_url!,
firstMapping.to_api_key!,
firstMapping.to_table_name,
(firstMapping.to_api_method as "POST" | "PUT" | "DELETE") ||
"POST",
templateBody,
mappedData,
urlPathColumn
);
if (apiResult.success && apiResult.data) {
insertResult = apiResult.data;
} else {
throw new Error(`템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}`);
throw new Error(
`템플릿 기반 REST API 데이터 전송 실패: ${apiResult.message}`
);
}
} else {
// 기존 REST API 전송 (REST API → DB 배치)
@ -412,14 +487,16 @@ export class BatchSchedulerService {
firstMapping.to_api_url!,
firstMapping.to_api_key!,
firstMapping.to_table_name,
firstMapping.to_api_method as 'POST' | 'PUT' || 'POST',
(firstMapping.to_api_method as "POST" | "PUT") || "POST",
mappedData
);
if (apiResult.success && apiResult.data) {
insertResult = apiResult.data;
} else {
throw new Error(`REST API 데이터 전송 실패: ${apiResult.message}`);
throw new Error(
`REST API 데이터 전송 실패: ${apiResult.message}`
);
}
}
} else {
@ -427,15 +504,17 @@ export class BatchSchedulerService {
insertResult = await BatchService.insertDataToTable(
firstMapping.to_table_name,
mappedData,
firstMapping.to_connection_type as 'internal' | 'external',
firstMapping.to_connection_type as "internal" | "external",
firstMapping.to_connection_id || undefined
);
}
successRecords += insertResult.successCount;
failedRecords += insertResult.failedCount;
logger.info(`테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`);
logger.info(
`테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`
);
} catch (error) {
logger.error(`테이블 처리 실패: ${tableKey}`, error);
failedRecords += 1;
@ -461,7 +540,9 @@ export class BatchSchedulerService {
for (const mapping of batch_mappings) {
try {
logger.info(`매핑 처리 시작: ${mapping.from_table_name} -> ${mapping.to_table_name}`);
logger.info(
`매핑 처리 시작: ${mapping.from_table_name} -> ${mapping.to_table_name}`
);
// FROM 테이블에서 데이터 조회
const fromData = await this.getDataFromSource(mapping);
@ -472,9 +553,14 @@ export class BatchSchedulerService {
successRecords += insertResult.successCount;
failedRecords += insertResult.failedCount;
logger.info(`매핑 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`);
logger.info(
`매핑 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`
);
} catch (error) {
logger.error(`매핑 처리 실패: ${mapping.from_table_name} -> ${mapping.to_table_name}`, error);
logger.error(
`매핑 처리 실패: ${mapping.from_table_name} -> ${mapping.to_table_name}`,
error
);
failedRecords += 1;
}
}
@ -487,7 +573,7 @@ export class BatchSchedulerService {
*/
private static async getDataFromSource(mapping: any) {
try {
if (mapping.from_connection_type === 'internal') {
if (mapping.from_connection_type === "internal") {
// 내부 DB에서 조회
const result = await query<any>(
`SELECT * FROM ${mapping.from_table_name}`,
@ -496,11 +582,14 @@ export class BatchSchedulerService {
return result;
} else {
// 외부 DB에서 조회 (구현 필요)
logger.warn('외부 DB 조회는 아직 구현되지 않았습니다.');
logger.warn("외부 DB 조회는 아직 구현되지 않았습니다.");
return [];
}
} catch (error) {
logger.error(`FROM 테이블 데이터 조회 실패: ${mapping.from_table_name}`, error);
logger.error(
`FROM 테이블 데이터 조회 실패: ${mapping.from_table_name}`,
error
);
throw error;
}
}
@ -513,19 +602,19 @@ export class BatchSchedulerService {
let failedCount = 0;
try {
if (mapping.to_connection_type === 'internal') {
if (mapping.to_connection_type === "internal") {
// 내부 DB에 삽입
for (const record of data) {
try {
// 매핑된 컬럼만 추출
const mappedData = this.mapColumns(record, mapping);
const columns = Object.keys(mappedData);
const values = Object.values(mappedData);
const placeholders = values.map((_, i) => `$${i + 1}`).join(', ');
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
await query(
`INSERT INTO ${mapping.to_table_name} (${columns.join(', ')}) VALUES (${placeholders})`,
`INSERT INTO ${mapping.to_table_name} (${columns.join(", ")}) VALUES (${placeholders})`,
values
);
successCount++;
@ -536,11 +625,14 @@ export class BatchSchedulerService {
}
} else {
// 외부 DB에 삽입 (구현 필요)
logger.warn('외부 DB 삽입은 아직 구현되지 않았습니다.');
logger.warn("외부 DB 삽입은 아직 구현되지 않았습니다.");
failedCount = data.length;
}
} catch (error) {
logger.error(`TO 테이블 데이터 삽입 실패: ${mapping.to_table_name}`, error);
logger.error(
`TO 테이블 데이터 삽입 실패: ${mapping.to_table_name}`,
error
);
throw error;
}
@ -552,10 +644,10 @@ export class BatchSchedulerService {
*/
private static mapColumns(record: any, mapping: any) {
const mappedData: any = {};
// 단순한 컬럼 매핑 (실제로는 더 복잡한 로직 필요)
mappedData[mapping.to_column_name] = record[mapping.from_column_name];
return mappedData;
}
@ -570,9 +662,9 @@ export class BatchSchedulerService {
}
this.scheduledTasks.clear();
this.isInitialized = false;
logger.info('모든 배치 스케줄이 중지되었습니다.');
logger.info("모든 배치 스케줄이 중지되었습니다.");
} catch (error) {
logger.error('배치 스케줄 중지 실패:', error);
logger.error("배치 스케줄 중지 실패:", error);
}
}